云原生最佳实践系列 5:基于函数计算 FC 实现阿里云 Kafka 消息内容控制 MongoDB DML 操作

赖尚荣
• 阅读 116

方案概述

在大数据 ETL 场景,将 Kafka 中的消息流转到其他下游服务是很常见的场景,除了常规的消息流转外,很多场景还需要基于消息体内容做判断,然后决定下游服务做何种操作。

该方案实现了通过 Kafka 中消息 Key 的内容来判断应该对 MongoDB 做增、删、改的哪种 DML 操作。

当 Kafka 收到消息后,会自动触发函数计算中的函数,接收到消息,对消息内容做判断,然后再操作 MongoDB。用户可以对提供的默认函数代码做修改,来满足更复杂的逻辑。

整体方案通过 CADT 可以一键拉起依赖的产品,并完成了大多数的配置,用户只需要到函数计算和 MongoDB 控制台做少量配置即可。

方案优势

  • 可以实现根据 Kafka 消息的具体内容判断,该对 MongoDB 做哪种 DML 操作,灵活性和可扩展性极高。
  • 函数计算具有完善的日志系统、容错机制。可以清晰的看到对每条消息的处理日志,如果逻辑执行失败,也有重试机制和函数失败补偿机制,保证业务数据的完整性和一致性。

详情可参见文档:

https://help.aliyun.com/zh/fc/user-guide/retry-policy

https://help.aliyun.com/zh/fc/result-callback

方案限制: 目前源 Kafka 只支持阿里云 Kafka。

部署架构

云原生最佳实践系列 5:基于函数计算 FC 实现阿里云 Kafka 消息内容控制 MongoDB DML 操作

架构说明

该架构图直观的表现出了该方案中使用到的网络(VPC,交换机,安全组)、Kafka、函数计算 FC、MongoDB 之间的关系。

网络架构:

  • 整个方案会在某个 Region 下,该示例使用的是北京 Region
  • 在 Region 下会创建一个 VPC

    • 在该 VPC 下会创建一个某可用区的交换机,该示例使用的是 G 可用区
    • 在该 VPC 下会创建一个安全组
  • Kafka,FC,MongoDB 都在该 VPC 的 G 可用区的交换机下
  • FC 在与 VPC 其他资源互通时会使用到 VPC 下的安全组

产品介绍

专有网络 VPC(Virtual Private Cloud): 是用户基于阿里云创建的自定义私有网络, 不同的专有网络之间二层逻辑隔离,用户可以在自己创建的专有网络内创建和管理云产品实例,比如 ECS、负载均衡、RDS 等。

函数计算 FC(Function Compute): 函数计算是事件驱动的全托管计算服务。使用函数计算,您无需采购与管理服务器等基础设施,只需编写并上传代码或镜像。函数计算为您准备好计算资源,弹性地、可靠地运行任务,并提供日志查询、性能监控和报警等功能。

云消息队列 Kafka 版: 云消息队列 Kafka 版是阿里云提供的分布式、高吞吐、可扩展的消息队列服务。云消息队列 Kafka 版广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等大数据领域,已成为大数据生态中不可或缺的部分。

云数据库 MongoDB 版(ApsaraDB for MongoDB): 完全兼容 MongoDB 协议,基于飞天分布式系统和高可靠存储引擎,提供多节点高可用架构、弹性扩容、容灾、备份恢复、性能优化等功能。

云速搭 CADT(Cloud Architect Design Tools): 是一款为上云应用提供自助式云架构管理的产品,显著地降低应用云上管理的难度和时间成本。本产品提供丰富的预制应用架构模板,同时也支持自助拖拽方式定义应用云上架构;支持较多阿里云服务的配置和管理。用户可以方便的对云上架构方案的成本、部署、运维、回收进行全生命周期的管理。

前置条件

在进行本文操作之前,您需要完成以下准备工作:

1)注册阿里云账号,并完成实名认证。您可以登录阿里云控制台,并前往实名认证页面 https://account.console.aliyun.com/v2/#/authc/home 查看是否完成实名认证。

2)购买按量付费资源,阿里云账户余额需要大于 100 元。考虑到部署后每小时会产生费用,建议账号内余额或者代金卷金额大于 200 元。您可以登录阿里云控制台,前往账户总览页面 https://usercenter2.aliyun.com/home 查看账户余额。

操作步骤

本实践可通过 CADT 官方模板快速拉起演示环境。

  1. 基础环境搭建
  2. 配置 MongoDB

    2.1. 设置白名单

    2.2. 记录 MongoDB 连接地址

    2.3. 创建 MongoDB 库和集合

    2.4. 查询 MongoDB 中的数据

  3. 配置函数计算 FC

    3.1. 登录函数计算 FC 控制台

    3.2. 配置函数环境变量

    3.3. 配置函数实例生命周期回调

    3.4. 配置函数的层

    3.5. 配置函数代码

  4. 场景验证

    4.1. 阿里云 Kafka 模拟发送消息

    4.2. 查询 MongoDB 数据

    4.3. 验证更多场景

  5. 一键释放资源

最佳实践全部内容,请点击此处查看。对方案和产品感兴趣的朋友,可以加入钉钉群交流(群号:31852400)。

往期文章:

云原生最佳实践系列 1:借助云速搭 CADT 如何实现 Kafka 的性能压测?

云原生最佳实践系列2:基于 MSE 云原生网关同城多活

云原生最佳实践系列 3:基于 SpringCloud 应用玩转 MSE

云原生最佳实践系列 4:基于 MSE 和 SAE 的微服务部署与压测

点赞
收藏
评论区
推荐文章
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
Stella981 Stella981
4年前
Knative 实战:基于阿里云 Kafka 实现消息推送
在Knative中已经提供了对Kafka事件源的支持,那么如何在阿里云上基于Kafka实现消息推送,本文给大家解锁这一新的姿势。背景消息队列forApacheKafka是阿里云提供的分布式、高吞吐、可扩展的消息队列服务。消息队列forApacheKafka广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等
Stella981 Stella981
4年前
Knative 实战:基于 Kafka 实现消息推送
!1(https://yqfile.alicdn.com/01ee67ae04a648166d18ad84af8538056340b4f8.png)作者| 元毅 阿里云智能事业群高级开发工程师导读:当前在Knative中已经提供了对Kafka事件源的支持,那么如何基于Kafka实现消息推送呢?本文作者将以阿里云Kafk
Stella981 Stella981
4年前
Kafka初入门简单配置与使用
一Kafka概述1.1Kafka是什么在流式计算中,Kafka一般用来缓存数据,Storm通过消费Kafka的数据进行计算。1)ApacheKafka是一个开源消息系统,由Scala写成。是由Apache软件基金会开发的一个开源消息系统项目。2)Kafka最初是由LinkedIn开发
Stella981 Stella981
4年前
Kafka 简介
Kafka简介_Kafka是分布式流平台。_一个流平台有3个主要特征:发布和订阅消息流,这一点与传统的消息队列相似。以容灾持久化方式的消息流存储。在消息流发生时处理消息流。Kafka通常使用在两大类应用中:在系统或应用之间,构建实时、可靠的消息流管道。构建实时流应用
Stella981 Stella981
4年前
FusionInsight大数据开发
Kafka应用开发1.了解Kafka应用开发适用场景2.熟悉Kafka应用开发流程3.熟悉并使用Kafka常用API4.进行Kafka应用开发Kafka的定义Kafka是一个高吞吐、分布式、基于发布订阅的消息系统Kafka有如下几个特点:1.高吞吐量2.消息持久化到磁
Stella981 Stella981
4年前
Kafka实战解惑
一、Kafka简介Kafka是LinkedIn使用Scala开发的一个分布式消息中间件,它以水平扩展能力和高吞吐率著称,被广泛用于日志处理、ETL等应用场景。Kafka具有以下主要特点:\\消息的发布、订阅均具有高吞吐量:\\据统计数字表明,Kafka每秒可以生产约25万消息(50MB),每秒处理55万消息(110MB)。
Stella981 Stella981
4年前
Kafka 原理详解
Kafka原理详解1kakfa基础概念说明Broker:消息服务器,就是我们部署的一个kafka服务Partition:消息的水平分区,一个Topic可以有多个分区,这样实现了消息的无限量存储Replica:消息的副本,即备份消息,存储在其他的broker上,当leader挂掉
扫盲Kafka?看这一篇就够了! | 京东云技术团队
kafka的使用场景为什么要使用Kafka消息队列?解耦、削峰:传统的方式上游发送数据下游需要实时接收,如果上游在某些业务场景:例如上午十点会流量激增至顶峰,那么下游资源可能会扛不住压力。但如果使用消息队列,就可以将消息暂存在消息管道中,下游可以按照自己的
扫盲Kafka?看这一篇就够了!
作者:京东科技于添馨kafka的使用场景为什么要使用Kafka消息队列?解耦、削峰:传统的方式上游发送数据下游需要实时接收,如果上游在某些业务场景:例如上午十点会流量激增至顶峰,那么下游资源可能会扛不住压力。但如果使用消息队列,就可以将消息暂存在消息管道中
线上SQL超时场景分析-MySQL超时之间隙锁 | 京东物流技术团队
前言之前遇到过一个由MySQL间隙锁引发线上sql执行超时的场景,记录一下。背景说明分布式事务消息表:业务上使用消息表的方式,依赖本地事务,实现了一套分布式事务方案消息表名:mqmessages数据量:3000多万索引:createtime和statuss
赖尚荣
赖尚荣
Lv1
少小离家老大回,乡音无改鬓毛衰。
文章
6
粉丝
0
获赞
0