博文推荐|使用 Apache Pulsar 和 Scala 进行事件流处理

智码领航员
• 阅读 1348
本文翻译自《Event Streaming with Apache Pulsar and Scala》,作者 Giannis 。

译者信息:姚余钱@深圳觉行科技有限公司,致力于医疗大数据领域。热衷开源,活跃于 Apache Pulsar 社区。

本文作者 Giannis Polyzos,StreamNative 高级工程师,主攻 Apache Pulsar 方向。Apache Pulsar 是云原生消息流平台,拥有广阔前景。在本文中,他将介绍 Pulsar 是什么以及它出色的性能,然后通过快速教程以帮助读者入门 Scala 语言运行 Pulsar。

文章摘要

在现代数据时代,对尽可能快地提供数据洞察的需求不断增加。“当前正在”发生的事情在几分钟甚至几秒钟后就可能变得无关紧要,因此越来越需要尽可能快地接收和处理事件——无论是为了改善业务使其在要求苛刻的市场中更具竞争力,还是为了使一个系统能根据其所受到的环境刺激而自我成长和适应。

随着容器和云基础设施的发展,公司在寻求利用和采用云原生的办法。迁移到云端并在系统中采用容器意味着我们很可能会利用 Kubernetes 等技术来实现其所有惊人的功能。将基础架构放置云端并采用云原生解决方案意味着很多用户也希望其消息传递和流解决方案符合这些原则。

在这篇文章中,我们将介绍如何使用 Apache Pulsar 和 Scala 实现云原生事件流处理。我们将回顾 Apache Pulsar 在这个现代数据时代须具备的能力,是什么让它脱颖而出,以及如何通过使用 Scala 和 pulsar4s 库创建一些简单的生产者和消费者来运行它。

1. 什么是 Apache Pulsar

如文档中所述,

Apache Pulsar 是一个云原生、分布式消息和流平台,每天管理数千亿个事件。

它最初是在 2013 年由 Yahoo 创建的,以满足其巨大的扩展需求 - 工程团队当时也审查了类似 Apache Kafka 等解决方案(尽管这些系统此后有了很大的发展),但并没有完全满足他们的需求。

其它系统缺少跨地域复制、多租户和偏移量管理等特性,以及处理消息积压情况下的性能,因此 Apache Pulsar 诞生了。

让我们仔细看看是什么让它脱颖而出:

  1. 统一消息和流两种场景:关于 Apache Pulsar,您应该注意的第一件事是,它是消息和流的统一平台。消息和流这两个术语经常被混为一谈,实际存在根本差异。例如在消息传递的场景中,用户希望消息一到就立刻消费它,然后将该消息删除;然而,对于流处理的场景,用户可能希望保留消息并能够重现它们。
  2. 多租户:Apache Pulsar 从一开始就被设计成一个多租户系统。您可以将多租户视为不同的用户组,每个用户组都在自己的隔离环境中运行。Pulsar 的逻辑架构由租户、命名空间和主题组成。命名空间是租户内主题的逻辑分组。您可以使用定义的层次结构轻松映射组织的需求,并提供隔离、身份验证、授权、配额以及在命名空间和主题级别应用不同的策略。电子商务业务的多租户示例如下,将 WebBanking 和 Marketing 等不同部门作为租户,然后这些部门成员可以在租户内进行操作。

博文推荐|使用 Apache Pulsar 和 Scala 进行事件流处理

  1. 跨地域复制:跨地域复制就是通过在跨地域上分布的不同集群的数据中心提供数据副本来保证容灾能力。Apache Pulsar 提供开箱即用的跨地域复制,无需外部工具。类似 Apache Kafka 的替代方案依赖于第三方解决方案 —— 即 MirrorMaker - 来解决此类已知存在问题的场景。使用 Pulsar,您可以通过强大的内置跨地域复制克服这些问题,并设计满足您需求的灾难恢复解决方案。
  2. 水平扩展:Apache Pulsar 的架构由三个组件组成。Pulsar Broker 是无状态服务层,Apache BookKeeper(bookie 服务器)作为存储层,最后 Apache ZooKeeper 作为元数据层 —— 尽管 2.8.0 版本引入了元数据层作为替代(参考 PIP 45)。所有层都彼此分离,这意味着您可以根据需要独立地扩展每个组件。Apache BookKeeper 使用分布式 ledger 的概念而不是基于日志的抽象,这使得它非常容易扩展,无需重新平衡。这也使得 Apache Pulsar 非常适合云原生环境。
  3. 分层存储:当您处理大量数据时,主题可能会无限制地变大,随着时间的推移,(存储成本)可能会变得非常昂贵。Apache Pulsar 提供了分层存储,因此随着主题的增长,您可以将旧数据卸载到一些更便宜的存储中(例如 Amazon S3),而您的客户端仍然可以访问数据并继续提供服务,就好像什么都没有发生过一样。
  4. Pulsar Functions:Pulsar Functions 是一个轻量级的无服务器计算框架,允许您以非常简单的方式部署自己的流处理逻辑。轻量级也使其成为物联网边缘分析用例的绝佳选择。

Apache Pulsar 还有很多特性,比如内置的 Schema Registry、对事务和 Pulsar SQL 的支持,但现在让我们看看如何真正启动和运行 Pulsar,并用 Scala 创建我们的第一个生产者和消费者。

2. 场景和集群设置示例

以简单的场景作为例子,我们创建一个读取模拟传感器事件的生产者,将事件发送到一个主题,然后在另一端创建订阅该主题并仅读取传入事件的消费者。我们将使用 pulsar4s client 库进行实现,同时使用 docker 运行 Pulsar 集群。为了以单机模式启动 Pulsar 集群,请在终端中运行以下命令:

docker run -it \ 
    -p 6650:6650 \ 
    -p 8080:8080 \ 
    --name pulsar \ 
    apachepulsar/pulsar:2.8.0 \ 
    bin/pulsar standalone

该命令将启动 Pulsar 并将必要的端口绑定到本地计算机。随着集群的启动运行,可以开始创建生产者和消费者。

3. Apache Pulsar 生产者

首先,需要有 pulsar4s-core 和 pulsar4s-circe 依赖——所以我们需要将以下内容添加到 build.sbt 文件中:

val pulsar4sVersion = "2.7.3"

lazy val pulsar4s       = "com.sksamuel.pulsar4s" %% "pulsar4s-core"  % pulsar4sVersion
lazy val pulsar4sCirce  = "com.sksamuel.pulsar4s" %% "pulsar4s-circe" % pulsar4sVersion

libraryDependencies ++= Seq(
  pulsar4s, pulsar4sCirce
)
``
Then we will define the message payload for a sensor event as follows:

然后我们为传感器事件定义消息负载,如下所示:


case class SensorEvent(sensorId: String,
                         status: String,
                         startupTime: Long,
                         eventTime: Long,
                         reading: Double)

我们还需要引入以下范围内容:

import com.sksamuel.pulsar4s.{DefaultProducerMessage, EventTime, ProducerConfig, PulsarClient, Topic}
import io.ipolyzos.models.SensorDomain
import io.ipolyzos.models.SensorDomain.SensorEvent
import io.circe.generic.auto._
import com.sksamuel.pulsar4s.circe._
import scala.concurrent.ExecutionContext.Implicits.global

所有应用程序的生产和消费的主要入口点是 Pulsar Client,它处理与 broker 的连接。在 Pulsar 客户端中,您还可以设置集群的身份验证或者调整其他重要的配置,例如超时设置和连接池。您可以通过提供要连接的 service url 来简单地实例化客户端。

val pulsarClient = PulsarClient("pulsar://localhost:6650")

有了客户端,让我们看看生产者的初始化和循环。

val topic = Topic("sensor-events")

// create the producer
val eventProducer = pulsarClient.producer[SensorEvent](ProducerConfig(
  topic, 
  producerName = Some("sensor-producer"), 
  enableBatching = Some(true),
  blockIfQueueFull = Some(true))
)

// sent 100 messages 
(0 until 100) foreach { _ =>
   val sensorEvent = SensorDomain.generate()
   val message = DefaultProducerMessage(
      Some(sensorEvent.sensorId), 
      sensorEvent, 
      eventTime = Some(EventTime(sensorEvent.eventTime)))
  
   eventProducer.sendAsync(message) // use the async method to sent the message
}

这里有几点需要注意:

  • 我们通过提供必要的配置来创建生产者 —— 生产者和消费者都是高度可配的,可以根据所需场景进行配置。
  • 我们在此处提供生产者的主题名称,启用批处理并使生产者在队列已满时阻塞操作。
  • 通过启用批处理,Pulsar 将使用内部队列来保存消息(默认值为 1000),并在队列满后将批次发送消息给 broker。
  • 正如您在示例代码中看到的,我们使用 .sendAsync() 方法将消息发送到 Pulsar。这是在不等待确认的情况下发送消息,同时由于我们将消息缓冲到队列中,这可能会使客户端不堪重负。
  • 通过选项 blockIfQueueFull 使用反压机制并通知生产者在发送更多消息之前等待。
  • 最后,我们创建要发送的消息。这里我们指定 sensorId 作为消息的键、sensorEvent 的值,我们还提供 eventTime,即事件产生的时间。

此时,我们的生产者就位,开始向 Pulsar 发送消息。完整的实现详见此处

4. Apache Pulsar 消费者

现在让我们把注意力转移到消费方面。就像我们对生产端所做的那样,消费端需要配置一个 Pulsar Client。

val consumerConfig = ConsumerConfig(
  Subscription("sensor-event-subscription"), 
  Seq(Topic("sensor-events")), 
  consumerName = Some("sensor-event-consumer"), 
  subscriptionInitialPosition = Some(SubscriptionInitialPosition.Earliest), 
  subscriptionType = Some(SubscriptionType.Exclusive)
)

val consumerFn = pulsarClient.consumer[SensorEvent](consumerConfig) 

var totalMessageCount = 0 
while (true) {   
  consumerFn.receive match {
     case Success(message) => 
         consumerFn.acknowledge(message.messageId)
         totalMessageCount += 1
         println(s"Total Messages '$totalMessageCount - Acked Message: ${message.messageId}")
      case Failure(exception) => 
         println(s"Failed to receive message: ${exception.getMessage}")
  }
}

步骤依次如下:

  • 同样的,首先创建消费者配置。在这里我们指定一个订阅名称,要订阅的主题、消费者的名称,以及我们想要消费者开始消费消息的位置 —— 在这里我们指定 Earliest - 这意味着订阅将在其确认的最后一条消息之后开始读取。
  • 最后,我们指定 SubscriptionType - 在这里它是 Exclusive (独占)类型,这也是默认的订阅类型(稍后会详细介绍订阅类型)。
  • 配置就绪后,使用创建的配置设置一个新的消费者,然后我们就有了一个简单的消费循环 —— 我们要做的就是使用接收方法读取一条新消息,该方法会阻塞直到有消息可用,然后我们确认消息,最后获取到目前为止收到的消息总数以及已确认的 messageId。
  • 请注意:当收到新消息时,如果一切顺利,您需要向客户端确认,否则需要使用 negativeAcknowledge() 方法否定确认消息。
  • 有了消费实现方式,我们就有了一个在运行的发布-订阅应用程序,它为 Pulsar 主题生产传感器事件,以及一个订阅并消费这些消息的消费者。
  • 消费者的完整实现详见此处

5. Apache Pulsar 订阅类型

正如文章中提到的,Apache Pulsar 通过提供不同的订阅类型做到了提供消息和流模式的统一。

Pulsar 有以下订阅类型:

  • 独占订阅:任何时间点都只允许一个消费者使用订阅读取消息
  • 灾备订阅:在任何时间点只允许一个消费者使用订阅读取消息,但您可以有多个备用消费者来接手工作,以防活跃的消费者失败
  • 共享订阅:可以将多个消费者附加到订阅上,并以轮询方式在它们之间共享工作。
  • 键共享订阅:可以将多个消费者附加到订阅上,并且每个消费者都分配有一组唯一的键。该消费者负责处理分配给它的一组键。如果失败,将为另一个消费者分配该组键。

博文推荐|使用 Apache Pulsar 和 Scala 进行事件流处理

不同的订阅类型用于不同的场景。例如,为了实现典型的扇出消息传递模式,您可以使用独占或灾备订阅类型。对于消息队列和工作队列,可选择共享订阅,以便在多个消费者之间共享工作;而对于流场景或基于键的流处理,灾备和键共享订阅不失为很好的选择,它们可以允许有序消费或基于某些键扩展您的处理。

6. 总结与阅读延伸

在这篇文章中,我们简要介绍了 Apache Pulsar 是什么、它作为统一的消息流平台与脱颖而出的能力、如何创建一些简单的生产和消费应用程序,最后我们重点介绍了 Pulsar 是如何通过不同的订阅模式统一消息和流。

延伸阅读:

  • Pulsar IO:轻松地将数据移入和移出 Pulsar
  • Pulsar Functions(Pulsar 的无服务器和轻量级的计算框架):用户可以使用它们在 Pulsar 主题上处理逻辑,减少生产和消费应用程序所需的所有样板代码
  • Function Mesh:通过利用 Kubernetes 原生功能(如部署和自动缩放)使您的事件流真正成为云原生。

关注公众号「Apache Pulsar」,获取更多技术干货

加入 Apache Pulsar 中文交流群👇🏻

博文推荐|使用 Apache Pulsar 和 Scala 进行事件流处理

点击链接阅读英文原文

点赞
收藏
评论区
推荐文章
blmius blmius
4年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
Wesley13 Wesley13
4年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Wesley13 Wesley13
4年前
VBox 启动虚拟机失败
在Vbox(5.0.8版本)启动Ubuntu的虚拟机时,遇到错误信息:NtCreateFile(\\Device\\VBoxDrvStub)failed:0xc000000034STATUS\_OBJECT\_NAME\_NOT\_FOUND(0retries) (rc101)Makesurethekern
Wesley13 Wesley13
4年前
FLV文件格式
1.        FLV文件对齐方式FLV文件以大端对齐方式存放多字节整型。如存放数字无符号16位的数字300(0x012C),那么在FLV文件中存放的顺序是:|0x01|0x2C|。如果是无符号32位数字300(0x0000012C),那么在FLV文件中的存放顺序是:|0x00|0x00|0x00|0x01|0x2C。2.  
Wesley13 Wesley13
4年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
4年前
PHP创建多级树型结构
<!lang:php<?php$areaarray(array('id'1,'pid'0,'name''中国'),array('id'5,'pid'0,'name''美国'),array('id'2,'pid'1,'name''吉林'),array('id'4,'pid'2,'n
Easter79 Easter79
4年前
SpringBoot整合Redis乱码原因及解决方案
问题描述:springboot使用springdataredis存储数据时乱码rediskey/value出现\\xAC\\xED\\x00\\x05t\\x00\\x05问题分析:查看RedisTemplate类!(https://oscimg.oschina.net/oscnet/0a85565fa
Wesley13 Wesley13
4年前
Java日期时间API系列36
  十二时辰,古代劳动人民把一昼夜划分成十二个时段,每一个时段叫一个时辰。二十四小时和十二时辰对照表:时辰时间24时制子时深夜11:00凌晨01:0023:0001:00丑时上午01:00上午03:0001:0003:00寅时上午03:00上午0
Python进阶者 Python进阶者
2年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
智码领航员
智码领航员
Lv1
汴水流,泗水流,流到瓜洲古渡头,吴山点点愁。思悠悠,恨悠悠,恨到归时方始休,月明人倚楼。
文章
5
粉丝
0
获赞
0