6. 基于Spring Data的领域事件发布

邓芝
• 阅读 3379

领域事件发布是一个领域对象为了让其它对象知道自己已经处理完成某个操作时发出的一个通知,事件发布力求从代码层面让自身对象与外部对象解耦,并减少技术代码入侵。

一、 手动发布事件

// 实体定义
@Entity
public class Department implements Serializable {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Integer departmentId;

    @Enumerated(EnumType.STRING)
    private State state;
}

// 事件定义
public class DepartmentEvent {
    private Department department;
    private State state;
    public DepartmentEvent(Department department) {
        this.department = department;
        state = department.getState();
    }
}

// 领域服务
@Service
public class ApplicationService {

    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;

    @Autowired
    private DepartmentRepository departmentRepository;

    @Transactional(rollbackFor = Exception.class)
    public void departmentAdd(Department department) {
        departmentRepository.save(department);
        // 事件发布
        applicationEventPublisher.publishEvent(new DepartmentEvent(department));
    }
}

使用applicationEventPublisher.publishEvent在领域服务处理完成后发布领域事件,此方法需要在业务代码中显式发布事件,并在领域服务里引入ApplicationEventPublisher类,但对领域服务本身有一定的入侵性,但灵活性较高。

二、 自动发布事件

// 实体定义
@Entity
public class SaleOrder implements Serializable {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Integer orderId;
   
    @Enumerated(EnumType.STRING)
    private State state;

    // 返回类型定义
    @DomainEvents
    public List<Object> domainEvents(){
        return Stream.of(new SaleOrderEvent(this)).collect(Collectors.toList());
    }

    // 事件发布后callback
    @AfterDomainEventPublication
    void callback() {
        System.err.println("ok");
    }
}

// 事件定义
public class SaleOrderEvent {
    private SaleOrder saleOrder;
    private State state;
    public SaleOrderEvent(SaleOrder saleOrder) {
        this.saleOrder = saleOrder;
        state = saleOrder.getState();
    }
}

// 领域服务
@Service
public class ApplicationService {
    @Autowired
    private OrderRepository orderRepository;
    
    @Transactional(rollbackFor = Exception.class)
    public void saleOrderAdd(SaleOrder saleOrder) {
        orderRepository.save(saleOrder);
    }
}

使用@DomainEvents定义事件返回的类型,必须是一个集合,使用@AfterDomainEventPublication定义事件发布后的回调。
此方法实事件类型定义在实体中,与领域服务完全解耦,没有入侵。系统会在orderRepository.save(saleOrder)后自动调用事件发布,另delete方法不会调用事件发布。

三、 事件监听

@Component
public class ApplicationEventProcessor {

    @EventListener(condition = "#departmentEvent.getState().toString() == 'SUCCEED'")
    public void departmentCreated(DepartmentEvent departmentEvent) {
        System.err.println("dept-event1:" + departmentEvent);
    }

    @Async
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, condition = "#saleOrderEvent.getState().toString() == 'SUCCEED'")
    public void saleOrderCreated(SaleOrderEvent saleOrderEvent) {
        System.err.println("sale-event succeed1:" + saleOrderEvent);
    }

    @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT, condition = "#saleOrderEvent.getState().toString() == 'SUCCEED'")
    public void saleOrderCreatedBefore(SaleOrderEvent saleOrderEvent) {
        System.err.println("sale-event succeed2:" + saleOrderEvent);
    }

    @Async
    @TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
    public void saleOrderCreatedFailed(SaleOrderEvent saleOrderEvent) {
        System.out.println("sale-event failed:" + saleOrderEvent);
    }
}

1. 使用@EventListener监听事件

@EventListener没有事务支持,只要事件发出就可监控到

@Transactional(rollbackFor = Exception.class)
public void departmentAdd(Department department) {
    departmentRepository.save(department);
    applicationEventPublisher.publishEvent(new DepartmentEvent(department));
    throw new RuntimeException("failed");
}

上述情况会造成事务失败回滚,但事件监控端已经执行,可能导致数据不一致的情况发生

2. 使用@TransactionalEventListener监听事件

  • TransactionPhase.BEFORE_COMMIT 事务提交前
  • TransactionPhase.AFTER_COMMIT 事务提交后
  • TransactionPhase.AFTER_ROLLBACK 事务回滚后
  • TransactionPhase.AFTER_COMPLETION 事务完成后

使用TransactionPhase.AFTER_COMMIT可在事务完成后,再执行事件监听方法,从而保证数据的一致性

3. TransactionPhase.AFTER_ROLLBACK回滚事务问题

@Async
@TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK, condition = "#departmentEvent.getState().toString() == 'SUCCEED'")
public void departmentCreatedFailed(DepartmentEvent departmentEvent) {
    System.err.println("dept-event3:" + departmentEvent);
}

由于@DomainEvents作用在实体上的,只有刚orderRepository.save(saleOrder)执行成功后才会发送事件,故AFTER_ROLLBACK方法只会在同一事务中其它语句执行失败或显式rollback时才会执行,如果save方法执行失败,将不会监听到回滚事件。

4. @Async异步事件监听

  • 没有此注解事件监听方法与主方法为一个事务。
  • 使用此注解将脱离原有事务,BEFORE_COMMIT也无法拦截事务提交前时刻
  • 此注解需要配合@EnableAsync一起使用

四、 总结

通过对 @DomainEvents@TransactionalEventListener的使用,在有效的解决领域事件发布的情况下,减少了对业务代码的入侵,同时尽一步解决了数据一致性问题。
在分布式结构下,通过MQ发送事件通知给其它服务,为解决一致性问题,防止对方服务处理失败可先将事件保久化到数据库后,再重试。

五、 源码

https://gitee.com/hypier/barr...

6. 基于Spring Data的领域事件发布

点赞
收藏
评论区
推荐文章
美凌格栋栋酱 美凌格栋栋酱
7个月前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
Jacquelyn38 Jacquelyn38
4年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Wesley13 Wesley13
3年前
java 的知识要多多了解。(下文为转发)
什么是发布对象? 发布对象是指使一个对象能够被当前范围之外的代码所使用 什么是对象逸出? 对象逸出是一种错误的发布,指当一个对象还没有构造完成时,就使它被其他线程所见 逸出demo!复制代码(https://oscimg.oschina.net/oscnet/237
Stella981 Stella981
3年前
EventBus源码分析
一、        EventBus简介1.1、EventBusEventBus是一个Android事件发布/订阅框架,通过解耦发布者和订阅者简化Android事件传递,这里的事件可以理解为消息,本文中统一称为事件。事件传递既可用于Android四大组件间通讯,也可以用户异步线程和主线程间通讯等等。传统的事件
Stella981 Stella981
3年前
ABP EventBus(事件总线)
事件总线就是订阅/发布模式的一种实现  事件总线就是为了降低耦合1.比如在winform中 到处都是事件 !(https://oscimg.oschina.net/oscnet/ed3426bf15550c4b0623956eb95e826780d.png)触发事件的对象 sender事件的数据  e事件的处理逻辑 方法
Stella981 Stella981
3年前
NET Core Web API下事件驱动型架构CQRS架构中聚合与聚合根的实现
NETCoreWebAPI下事件驱动型架构在前面两篇文章中,我详细介绍了基本事件系统的实现,包括事件派发和订阅、通过事件处理器执行上下文来解决对象生命周期问题,以及一个基于RabbitMQ的事件总线的实现。接下来对于事件驱动型架构的讨论,就需要结合一个实际的架构案例来进行分析。在领域驱动设计的讨论范畴,CQRS架构本身就是事件驱动的,因此,我打算首先介
Easter79 Easter79
3年前
Spring中ApplicationContext的事件机制
   ApplicationContext事件机制是观察者设计模式的实现,通过ApplicationEvent类和ApplicationListener接口,可以实现ApplicationContext事件处理。如果容器中有一个ApplicationListenerBean,每当ApplicationContext发布ApplicationEvent时,
Stella981 Stella981
3年前
Noark入门之异步事件
引入异步事件主要是为了各模块的解耦,每当完成一个动作时,向系统发布一个事件,由关心的模块自己监听处理,可选择同步处理,异步处理,延迟处理。何时发布事件,当其他模块关心此动作时<br比如获得道具时,任务系统模块要判定完成进度,BI模块需要上报等等都可以监听此事件,已达模块解耦0x00事件源一个实现xyz.noark.core.event
Wesley13 Wesley13
3年前
#分布式系统架构之# 事件驱动模式以及与之匹配的长时间处理过程讨论
     在分布式系统下,可以很多种架构从事设计,或者分布式系统对技术架构本身没有做严格的限制。但是结合自己的实践以及基于《领域驱动设计》的推荐,采用【事件驱动模式】是比较好的一种分布式系统架构方式。该模式充分实现了不同系统之间的代码解耦,所有的业务流转是通过事件广播进行驱动的。所有业务都是在针对名为【事件总线】的组件在编程,也无需知道事件的生产者
Wesley13 Wesley13
3年前
DDD领域驱动设计实战(六)
点击上方“JavaEdge”,关注公众号设为“星标”,好文章不错过!1定义将领域中所发生的活动建模成一系列的离散事件。每个事件都用领域对象来表示。领域事件是领域模型的组成部分,表示领域中所发生的事情。一个领域事件将导致进一步的业务操作,在实现业务解耦的同时,还有助于形成完整的业务闭环。
Wesley13 Wesley13
3年前
MNE
点击上面"脑机接口社区"关注我们更多技术干货第一时间送达大家好!今天Rose小哥结合案例代码给大家介绍一下MNE是如何从Raw对象中解析event的。这篇内容主要描述了如何从原始记录中读取实验事件,以及如何在MNEPython中事件的两种不同表示形式(事件数组和注释对象)之间进行转换。在入门教程中,我们看到了
邓芝
邓芝
Lv1
小楼一夜听春雨,深巷明朝卖杏花。
文章
3
粉丝
0
获赞
0