Flink实战-订单支付和对账情况监控(分别使用CEP和ProcessFunction来实现)

关注功
• 阅读 3089

在电商网站中,订单的支付作为直接与钱挂钩的一环,在业务流程中非常重要。对于订单而言,为了正确控制业务流程,也为了增加用户的支付意愿,网站一般会设置一个支付失效时间,超过一段时间没支付的订单就会被取消。另外,对于订单的支付,还应该保证最终支付的正确性,可以通过第三方支付平台的交易数据来做一个实时对账

第一个实现的效果,实时获取订单数据,分析订单的支付情况,分别实时统计支付成功的和15分钟后支付超时的情况

新建一个maven项目,这是基础依赖,如果之前引入了,就不用加了

<properties>

 <maven.compiler.source>`8`</maven.compiler.source>
 <maven.compiler.target>`8`</maven.compiler.target>
 <flink.version>`1.10.1`</flink.version>
 <scala.binary.version>`2.12`</scala.binary.version>
 <kafka.version>`2.2.0`</kafka.version>
 </properties>
 <dependencies>
 <dependency>
 <groupId>`org.apache.flink`</groupId>
 <artifactId>`flink-scala_${scala.binary.version}`</artifactId>
 <version>`${flink.version}`</version>
 </dependency>
 <dependency>
 <groupId>`org.apache.flink`</groupId>
 <artifactId>`flink-streaming-scala_${scala.binary.version}`</artifactId>
 <version>`${flink.version}`</version>
 </dependency>
 <dependency>
 <groupId>`org.apache.kafka`</groupId>
 <artifactId>`kafka_${scala.binary.version}`</artifactId>
 <version>`${kafka.version}`</version>
 </dependency>
 <dependency>
 <groupId>`org.apache.flink`</groupId>
 <artifactId>`flink-connector-kafka_${scala.binary.version}`</artifactId>
 <version>`${flink.version}`</version>
 </dependency>
 <dependency>
 <groupId>`cn.hutool`</groupId>
 <artifactId>`hutool-all`</artifactId>
 <version>`5.5.6`</version>
 </dependency>
 <dependency>
 <groupId>`org.apache.flink`</groupId>
 <artifactId>`flink-table-planner-blink_2.12`</artifactId>
 <version>`1.10.1`</version>
 </dependency>
 </dependencies>

这个场景需要用到cep,所以再加入cep依赖

<dependencies>

 <dependency>
 <groupId>`org.apache.flink`</groupId>
 <artifactId>`flink-cep-scala_${scala.binary.version}`</artifactId>
 <version>`${flink.version}`</version>
 </dependency>
 </dependencies>

准备数据源文件src/main/resources/OrderLog.csv:

`1234,`**create**`,,`1611047605

1235,create,,1611047606

1236,create,,1611047606

1234,pay,akdb3833,1611047616

把java目录改为scala,新建com.mafei.orderPayMonitor.OrderTimeoutMonitor.scala 的object

/*

*

* @author mafei

* @date 2021/1/31

*/

package com.mafei.orderPayMonitor
import org.apache.flink.cep.{PatternSelectFunction, PatternTimeoutFunction}
import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.api.windowing.time.Time
import java.util

/**

* _定义输入样例类类型,_

*

* @param orderId _订单id_

* @param eventType _事件类别: 创建订单create还是支付订单pay_

* @param txId _支付流水号_

* @param ts _时间_

*/

case class OrderEvent(orderId: Long, eventType:String,txId: String, ts: Long)

/**

* _定义输出样例类类型,_

*/

case class OrderResult(orderId: Long, resultMsg: String)

object OrderTimeoutMonitor {
 def main(args: Array[String]): Unit = {
 val env = StreamExecutionEnvironment.getExecutionEnvironment
 `env.setParallelism(`1`)`
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 _// 1__、从文件中读取数据_
 `val resource = getClass.getResource(`"/OrderLog.csv"`)`
 val orderEvnetStream = env.readTextFile(resource.getPath)
 .map(d=>{
 `val arr = d.split(`","`)`
 `OrderEvent(arr(`0`).toLong,arr(`1`),arr(`2`), arr(`3`).toLong)` _//__把数据读出来转换成想要的样例类类型_
 `}).assignAscendingTimestamps(_.ts *` 1000`L)` _//__指定ts字段_
 `.keyBy(_.orderId)` _//__按照订单id分组_
 _/**_

* 2__、定义事件-匹配模式

* _定义15分钟内能发现订单创建和支付_

*/

 val orderPayPattern = Pattern
 `.begin[OrderEvent](`"create"`).where(_.eventType ==` "create"`)` _//__先出现一个订单创建的事件_
 `.followedBy(`"pay"`).where(_.eventType ==` "pay"`)` _//__后边再出来一个支付事件_
 `.within(Time.minutes(`15`))` _//__定义在15分钟以内,触发这2个事件_
 _// 3__、将pattern应用到流里面,进行模式检测_
 val patternStream = CEP.pattern(orderEvnetStream, orderPayPattern)
 _//4__、定义一个侧输出流标签,用于处理超时事件_
 `val orderTimeoutTag = new OutputTag[OrderResult](`"orderTimeout"`)`
 _// 5__、调用select 方法,提取并处理匹配的成功字符事件以及超时事件_
 val resultStream = patternStream.select(
 orderTimeoutTag,
 new OrderTimeoutSelect(),
 new OrderPaySelect()
 )
 `resultStream.print(`"pay"`)`
 resultStream.getSideOutput(orderTimeoutTag).print()
 `env.execute(`" order timeout monitor"`)`
 }
}

//__获取超时之后定义的事件还没触发的情况,也就是订单支付超时了。

class OrderTimeoutSelect() extends PatternTimeoutFunction[OrderEvent, OrderResult]{

 override def timeout(map: util.Map[String, util.List[OrderEvent]], l: Long): OrderResult = {
 `val timeoutOrderId = map.get(`"create"`).iterator().next().orderId`
 `OrderResult(timeoutOrderId,` "超时了。。。。超时时间:"`+l)`
 }
}

class OrderPaySelect() extends PatternSelectFunction[OrderEvent, OrderResult]{

 override def select(map: util.Map[String, util.List[OrderEvent]]): OrderResult = {
 `val orderTs = map.get(`"create"`).iterator().next().ts`
 `val paydTs = map.get(`"pay"`).iterator().next().ts`
 `val payedOrderId = map.get(`"pay"`).iterator().next().orderId`
 `OrderResult(payedOrderId,` "订单支付成功,下单时间:"`+orderTs+`" 支付时间:"`+paydTs)`
 }
}

用ProcessFunction来实现上面的场景
csv还可以用上面的数据,新建一个scala的object src/main/scala/com/mafei/orderPayMonitor/OrderTimeoutMonitorWithProcessFunction.scala

/*
 *
 * @author mafei
 * @date 2021/1/31
*/
package com.mafei.orderPayMonitor
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.util.Collector
object OrderTimeoutMonitorWithProcessFunction {
 def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setParallelism(1)

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

// 1、从文件中读取数据

val resource = getClass.getResource("/OrderLog.csv")

val orderEventStream = env.readTextFile(resource.getPath)

.map(d=>{

val arr = d.split(",")

OrderEvent(arr(0).toLong,arr(1),arr(2), arr(3).toLong) //把数据读出来转换成想要的样例类类型

}).assignAscendingTimestamps(_.ts * 1000L) //指定ts字段

.keyBy(_.orderId) //按照订单id分组

val resultStream = orderEventStream

.process(new OrderPayMatchProcess())

resultStream.print("支付成功的: ")

resultStream.getSideOutput(new OutputTag[OrderResult]).print("订单超时事件")

env.execute("订单支付监控with ProcessFunction")

 }
}
class OrderPayMatchProcess() extends KeyedProcessFunction[Long, OrderEvent, OrderResult]{
 // 
先定义状态标识,标识create、payed、是否已经出现,以及对应的时间戳
 `lazy val isCreateOrderState: ValueState[`Boolean`] = getRuntimeContext.getState(new ValueStateDescriptor[`Boolean`](`"isCreateOrderState", classOf[Boolean]`))`
 `lazy val isPayedOrderState: ValueState[`Boolean`] = getRuntimeContext.getState(new ValueStateDescriptor[`Boolean`](`"isPayedOrderState", classOf[Boolean]`))`
 `lazy val timerTsState : ValueState[`Long`] = getRuntimeContext.getState(new ValueStateDescriptor[`Long`](`"timerTsState", classOf[Long]`))`
 // 
定义一个侧输出流,捕获timeout的订单信息
 `val orderTimeoutOutputTag = new OutputTag[`OrderResult`](`"timeout"`)`
 override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, OrderEvent, OrderResult]#OnTimerContext, out: Collector[OrderResult]): Unit = {

//到这里,肯定不会出现订单创建和支付同时存在的情况,因为会在processElement处理掉

//如果只有订单创建

if (isCreateOrderState.value()){

ctx.output(orderTimeoutOutputTag,OrderResult(ctx.getCurrentKey,"订单没支付或超时"))

}else if(isPayedOrderState.value()){

ctx.output(orderTimeoutOutputTag, OrderResult(ctx.getCurrentKey,"只有支付,没看到订单提交"))

}

isCreateOrderState.clear()

isPayedOrderState.clear()

timerTsState.clear()

 }
 override def processElement(i: OrderEvent, context: KeyedProcessFunction[Long, OrderEvent, OrderResult]#Context, collector: Collector[OrderResult]): Unit = {

/**

  • 判断当前事件类型,是create还是pay
  • 分几种情况:
  • 1、判断create和pay都来了
  • 要看有没有超时,没有超时就正常输出
  • 超时了输出到侧输出流
  • 2、create或者pay有一个没来
  • 注册一个定时器等着,然后等定时器触发后再输出

*

*/

val isCreate = isCreateOrderState.value()

val isPayed = isPayedOrderState.value()

val timerTs = timerTsState.value()

// 1、create来了

if (i.eventType == "create"){

// 1.1 如果已经支付过了,那是正常支付完成,输出匹配成功的结果

if (isPayed){

isCreateOrderState.clear()

isPayedOrderState.clear()

timerTsState.clear()

context.timerService().deleteEventTimeTimer(timerTs)

collector.collect(OrderResult(context.getCurrentKey,"支付成功"))

}else{ //如果没有支付过,那注册一个定时器,等待15分钟后触发

context.timerService().registerEventTimeTimer(i.ts)

timerTsState.update(i.ts 1000L + 9001000L)

isCreateOrderState.update(true)

}

}

else if(i.eventType == "pay"){ //如果当前事件是支付事件

if(isCreate){ //判读订单创建事件已经发生

if(i.ts * 1000L < timerTs){ // 创建订单到支付的时间在超时时间内,代表正常支付

collector.collect(OrderResult(context.getCurrentKey,"支付成功"))

}else{

context.output(orderTimeoutOutputTag, OrderResult(context.getCurrentKey,"已经支付,但是没有找到订单超时了"))

}

isCreateOrderState.clear()

isPayedOrderState.clear()

timerTsState.clear()

context.timerService().deleteEventTimeTimer(timerTs)

}else{ //如果没看到订单创建的事件,那就注册一个定时器等着

context.timerService().registerEventTimeTimer(i.ts)

isPayedOrderState.update(true)

timerTsState.update(i.ts)

}

}

 }
`}`

上面实现了监测用户支付的情况,实际中还需要对支付后的账单跟第三方支付平台做一个实时对账功能

会涉及到2条源码交易数据流(支付和账单)的合流计算

这里模拟账单,所以需要准备一个数据ReceiptLog.csv

akdb3833,alipay,1611047619
`akdb3832,wechat,1611049617`

上代码: src/main/scala/com/mafei/orderPayMonitor/TxMatch.scala

/*
 *
 * @author mafei
 * @date 2021/1/31
*/
package com.mafei.orderPayMonitor
import com.mafei.orderPayMonitor.OrderTimeoutMonitor.getClass
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.util.Collector
case class ReceiptEvent(orderId: String, payChannel:String, ts: Long)
object TxMatch {
 def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setParallelism(1)

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

// 1、从订单文件中读取数据

val resource = getClass.getResource("/OrderLog.csv")

val orderEventStream = env.readTextFile(resource.getPath)

.map(d=>{

val arr = d.split(",")

OrderEvent(arr(0).toLong,arr(1),arr(2), arr(3).toLong) //把数据读出来转换成想要的样例类类型

}).assignAscendingTimestamps(_.ts * 1000L) //指定ts字段

.filter(_.eventType=="pay")

.keyBy(_.txId) //按照交易id分组

// 2、从账单中读取数据

val receiptResource = getClass.getResource("/ReceiptLog.csv")

val receiptEventStream = env.readTextFile(receiptResource.getPath)

.map(d=>{

val arr = d.split(",")

ReceiptEvent(arr(0),arr(1),arr(2).toLong) //把数据读出来转换成想要的样例类类型

}).assignAscendingTimestamps(_.ts * 1000L) //指定ts字段

.keyBy(_.orderId) //按照订单id分组

// 3、合并两条流,进行处理

val resultStream = orderEventStream.connect(receiptEventStream)

.process(new TxPayMatchResult())

resultStream.print("match: ")

resultStream.getSideOutput(new OutputTag[OrderEvent]).print("unmatched-pay")

resultStream.getSideOutput(new OutputTag[ReceiptEvent]).print("unmatched-receipt")

env.execute()

 }
}
class TxPayMatchResult() extends CoProcessFunction[OrderEvent,ReceiptEvent,(OrderEvent,)]{
 `lazy val orderEventState: ValueState[`OrderEvent`] = getRuntimeContext.getState(new ValueStateDescriptor[`OrderEvent`]
 `lazy val receiptEventState: ValueState[`ReceiptEvent`] = getRuntimeContext.getState(new ValueStateDescriptor[`ReceiptEvent`](`"payEvent", classOf[ReceiptEvent]`))`
 // 
定义自定义侧输出流
 `val unmatchedOrderEventTag = new OutputTag[`OrderEvent`](`"unmatched-pay"`)`
 `val unmatchedReceiptEventTag = new OutputTag[`ReceiptEvent`](`"receipt"`)`
 override def processElement1(in1: OrderEvent, context: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, collector: Collector[(OrderEvent, ReceiptEvent)]): Unit = {

//判断支付账单来了

val receiptEvent = receiptEventState.value()

if(receiptEvent != null){

//如果账单已经过来了,那直接输出

collector.collect((in1,receiptEvent))

orderEventState.clear()

receiptEventState.clear()

}else{

//如果没来,那就注册一个定时器,等待10秒钟

context.timerService().registerEventTimeTimer(in1.ts*1000L + 10000L)

orderEventState.update(in1)

}

 }
 override def processElement2(in2: ReceiptEvent, context: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, collector: Collector[(OrderEvent, ReceiptEvent)]): Unit = {

//判断支付事件来了

val orderEvent = orderEventState.value()

if(orderEvent != null){

//如果账单已经过来了,那直接输出

collector.collect((orderEvent,in2))

orderEventState.clear()

receiptEventState.clear()

}else{

//如果没来,那就注册一个定时器,等待2秒钟

context.timerService().registerEventTimeTimer(in2.ts*1000L + 2000L)

receiptEventState.update(in2)

}

 }
 override def onTimer(timestamp: Long, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#OnTimerContext, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {

if(orderEventState.value() != null){

ctx.output(unmatchedOrderEventTag, orderEventState.value())

}

else if(receiptEventState.value() != null){

ctx.output(unmatchedReceiptEventTag, receiptEventState.value())

}

orderEventState.clear()

receiptEventState.clear()

 }
`}`

第二种, 使用join来实现这个效果
这种方式优点是跟方便了,做了一层封装,缺点也很明显如果要实现一些复杂情况如没匹配中的也输出之类的就不行了,具体看实际场景需要

/*

*

* @author mafei

* @date 2021/1/31

*/

package com.mafei.orderPayMonitor
import com.mafei.orderPayMonitor.TxMatch.getClass
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
object TxMatchWithJoin {
 def main(args: Array[String]): Unit = {
 val env = StreamExecutionEnvironment.getExecutionEnvironment
 `env.setParallelism(`1`)`
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 _// 1__、从订单文件中读取数据_
 `val resource = getClass.getResource(`"/OrderLog.csv"`)`
 val orderEventStream = env.readTextFile(resource.getPath)
 .map(d=>{
 `val arr = d.split(`","`)`
 `OrderEvent(arr(`0`).toLong,arr(`1`),arr(`2`), arr(`3`).toLong)` _//__把数据读出来转换成想要的样例类类型_
 `}).assignAscendingTimestamps(_.ts *` 1000`L)` _//__指定ts字段_
 `.filter(_.eventType==`"pay"`)`
 `.keyBy(_.txId)` _//__按照交易id分组_
 _// 2__、从账单中读取数据_
 `val receiptResource = getClass.getResource(`"/ReceiptLog.csv"`)`
 val receiptEventStream = env.readTextFile(receiptResource.getPath)
 .map(d=>{
 `val arr = d.split(`","`)`
 `ReceiptEvent(arr(`0`),arr(`1`),arr(`2`).toLong)` _//__把数据读出来转换成想要的样例类类型_
 `}).assignAscendingTimestamps(_.ts *` 1000`L)` _//__指定ts字段_
 `.keyBy(_.orderId)` _//__按照订单id分组_
 val resultStream = orderEventStream.intervalJoin(receiptEventStream)
 `.between(Time.seconds(`-3`), Time.seconds(`5`))`
 .process(new TxMatchWithJoinResult())
 resultStream.print()
 env.execute()
 }
}

class TxMatchWithJoinResult() extends ProcessJoinFunction[OrderEvent, ReceiptEvent,(OrderEvent,ReceiptEvent)]{

 `override def processElement(in1: OrderEvent, in2: ReceiptEvent, context: ProcessJoinFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]`**#Context, collector: Collector[(OrderEvent, ReceiptEvent)]): Unit = {**
 collector.collect((in1,in2))
 }
`}`
点赞
收藏
评论区
推荐文章
ThinkPHP V5.0 接入微信支付+回调
ThinkPHPV5.0接入微信支付微信支付接口组装访问数组$data'body''商城购买商品';//订单标题$data'outtradeno'generaterandstr(8,0);//平台订单号(非小程序订单,自己平台生成的)$data'notifyurl'$thisrequestdomain().'';//微信支付
Easter79 Easter79
3年前
thinkcmf+jsapi 实现微信支付
首先从小程序端接收订单号、金额等参数,然后后台进行统一下单,把微信支付的订单号返回,在把订单号发送给前台,前台拉起支付,返回参数后更改支付状态。。。回调publicfunctionnotify(){$wechatDb::name('wechat')where('status',1)find();
Wesley13 Wesley13
3年前
java程序支付宝接口付费功能的实现
以前做过c应用程序支付宝api接口功能,现在转移到Java程序上,代码如何实现呢?1、从你的网站提交到支付宝:/\\\将订单提交支付宝进行网上支付\/publicActionForwardsubmitAlipayUrl(ActionMappingmapping,ActionFormform,HttpServletReque
Stella981 Stella981
3年前
SpringBoot+Redis+拦截器+自定义注解实现接口幂等性
一、概念任意多次执行所产生的影响均与一次执行的影响相同。按照这个含义,最终的含义就是对数据库的影响只能是一次性的,不能重复处理。比如:订单接口,不能多次创建订单。支付接口,重复支付同一笔订单只能扣一次钱。支付宝回调接口,可能会多次回调,必须处理重复回调。普通表单提交接口
Stella981 Stella981
3年前
Flink双流实时对账
!(https://oscimg.oschina.net/oscnet/31a0748e049f466cb5a3eb5ac355307e.jpg)背景在电商、金融、银行、支付等涉及到金钱相关的领域,为了安全起见,一般都有对账的需求。比如,对于订单支付事件,用户通过某宝付款,虽然用户支付成功,但是用户支付完成后并
Easter79 Easter79
3年前
SpringBoot+Redis+拦截器+自定义注解实现接口幂等性
一、概念任意多次执行所产生的影响均与一次执行的影响相同。按照这个含义,最终的含义就是对数据库的影响只能是一次性的,不能重复处理。比如:订单接口,不能多次创建订单。支付接口,重复支付同一笔订单只能扣一次钱。支付宝回调接口,可能会多次回调,必须处理重复回调。普通表单提交接口
Easter79 Easter79
3年前
SpringBoot+RabbitMQ+Redis实现商品秒杀
业务分析一般而言,商品秒杀大概可以拆分成以下几步:1.用户校验校验是否多次抢单,保证每个商品每个用户只能秒杀一次2.下单订单信息进入消息队列,等待消费3.减少库存消费订单消息,减少商品库存,增加订单记录4.付款十五分钟内完成支付,修改支付状态创建表goods\_info商品库存表
Stella981 Stella981
3年前
Spring Boot与RabbitMQ结合实现延迟队列的示例
背景何为延迟队列?顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列。而一般的队列,消息一旦入队了之后就会被消费者马上消费。场景一:在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行一场处理。这是就可以使用延时队列将订单信息发送到延时队列。场景二:用户希望通过手机远程遥控
Stella981 Stella981
3年前
RabbitMQ队列延迟
RabbitMQ队列延迟1\.场景:“订单下单成功后,15分钟未支付自动取消”1.传统处理超时订单采取定时任务轮训数据库订单,并且批量处理。其弊端也是显而易见的;对服务器、数据库性会有很大的要求,并且当处理大量订单起来会很力不从心,而且实时性也不是特别好。当然传统的手法还可以再
Stella981 Stella981
3年前
SpringBoot+RabbitMQ+Redis实现商品秒杀
业务分析一般而言,商品秒杀大概可以拆分成以下几步:1.用户校验校验是否多次抢单,保证每个商品每个用户只能秒杀一次2.下单订单信息进入消息队列,等待消费3.减少库存消费订单消息,减少商品库存,增加订单记录4.付款十五分钟内完成支付,修改支付状态创建表goods\_info商品库存表
关注功
关注功
Lv1
保持对生活的爱和热忱,把每一天活得热气腾腾。
文章
5
粉丝
0
获赞
0