Cloudflare 的 Kafka 之旅:万亿级消息处理实践

在不到 6 年的时间里,Cloudflare 已经向 Kafka 中生成了超过 1 万亿条用于服务间通信的消息。随着公司和应用程序服务团队的发展,他们必须不断调整他们的工具来支持持续的快速交付。

 

我们将在本文中介绍我们早期的分布式团队工作,以及如何基于 Kafka 构建抽象来处理万亿条消息。

 

我们还将介绍近年来可伸缩性给我们带来的挑战,以及我们为应对不断增长的需求而采用的一些方法和模式。

 

Cloudflare 简介

 

Cloudflare 为用户提供了一个全局网络,并帮助他们提升他们网站、API 和网络流量的安全性。

 

这个网络还可以保护企业网,让他们能够在边缘运行和部署整个应用程序。

 

Cloudflare 提供了一系列产品,包括 CDN、Zero Trust 和 Cloudflare Workers,来实现这些目标,以及识别和阻止恶意活动,让用户能够把精力专注在他们的业务上。


Cloudflare 的 Kafka 之旅:万亿级消息处理实践


图 1 Cloudflare 的全局网络

 

从工程角度来看,Cloudflare 网络有两个主要组成部分:全局边缘网络和 Cloudflare 控制平面。

 

这个网络的很大一部分是使用 Cloudflare 的产品来构建的,其中 Workers 被部署在边缘网络上。另一方面,控制平面是一组数据中心,在数据中心的机器上运行着 Kubernetes、Kafka 和数据库。Kafka 生产者和消费者通常都部署在 Kubernetes 中,但也取决于工作负载和期望获得的结果。

 

在本文中,我们将重点关注 Cloudflare 控制平面,并探索如何通过扩展服务间通信和工具来为运营提供支持。

 

Kafka

 

Apache Kafka有集群的概念,集群由多个 broker 组成,每个集群都有一个指定的首领 broker 负责协调通信。在下图中,Broker 2 就是首领。


Cloudflare 的 Kafka 之旅:万亿级消息处理实践


图 2 Kafka 集群

 

消息被分类到主题中,例如用户事件(创建用户或更新用户信息等)被放在同一个主题中。主题被分为分区,这样 Kafka 可以很好地进行横向伸缩。在图中,两个 broker 上都有主题 A 的分区,每个分区都有一个指定的首领来确定其“真相来源”。为了确保弹性,分区根据指定的复制系数进行复制,通常最小的复制系数为 3。向 Kafka 发送消息的服务叫作生产者,从 Kafka 读取消息的服务叫作消费者。

 

Cloudflare 的工程文化

 

在过去,Cloudflare 是一个独立的 PHP 应用程序,随着公司的发展和多样化,这种方式暴露出了它的局限性和风险。

 

现在,团队不再强制指定使用特定的工具或编程语言,他们可以构建和维护他们自己的服务,公司鼓励创新并倡导使用高效的工具和实践。应用程序服务团队是一个相对较新的团队,职责是通过提供遵循最佳实践的工具帮助其他团队更容易完成工作,这样开发团队就可以专注于交付业务价值。

 

紧密耦合

 

随着产品规模的增长,有必要找到更好的方法,让团队能够按照自己的节奏前进,并与其他团队解耦,工程团队也需要对回退请求和工作完成保证有更多的控制。

 

由于我们已经在使用 Kafka 集群来处理大量的数据,所以我们决定花点时间创建一个通用的消息总线集群:很简单,只需要创建一个拉取请求,设置一个新主题所需的一切,包括复制策略、保留期限和 ACL。下图说明了消息总线集群是如何帮助解耦团队的。


Cloudflare 的 Kafka 之旅:万亿级消息处理实践


图 3 通用的消息总线集群

 

上图中的三个团队可以发出审计日志系统感兴趣的消息,而不需要了解特定的服务。更少的耦合让工程团队可以更有效地完成工作。

 

非结构化的通信

 

对于事件驱动的系统来说,为了避免耦合,系统之间不应该相互感知。最初,我们没有强制要求使用怎样的消息格式,生产者团队可以自己决定如何生产消息。如果团队之间没有严格的契约,这可能会导致非结构化的通信,并带来挑战,无法被处理的消息数量会激增。

 

为了避免出现非结构化的通信,应用程序服务团队在 Kafka 生态系统中寻找解决方案,并找到了两个可行的选择——Apache Avro和protobuf,并最终选择了后者。我们以前一直使用 JSON,但发现很难实现兼容性,而且 JSON 消息比 protobuf 大。


Cloudflare 的 Kafka 之旅:万亿级消息处理实践


图 4 protobuf 消息示例

 

protobuf 提供了严格的消息类型和向前和向后兼容性,能够用多种语言生成代码也是它的一个主要优势。应用程序服务团队建议在 protobuf 消息中加入详细的注释,并使用 Uber 的开源工具Prototool进行变更检测和强制执行风格规则检查。


Cloudflare 的 Kafka 之旅:万亿级消息处理实践


图 5 切换到 Protobuf

 

只有 Protobuf 是不够的:不同的团队仍然可以向同一个主题发送消息,由于格式不符合预期,消费者可能无法处理它们。此外,配置 Kafka 的消费者和生产者并不是一件容易的事情,需要对工作负载有充分的了解。由于大多数团队都在使用 Go,我们决定使用 Go 构建一个“消息总线客户端库”,既可以遵循最佳实践,也可以让团队更快地完成他们的工作。

 

为了避免团队向同一个主题发送不同的消息,我们做出了一个有争议的决定,即(在客户端)强制每个主题使用一种 protobuf 消息类型。这个决定很容易被采用,但它会导致创建太多的主题,复制太多的分区,因为分区的复制系数至少为 3。

 

连接器

 

通过引入工具和抽象,应用程序服务团队在简化 Kafka 基础设施方面取得了重大进展,但我们发现,要遵循最佳实践,还有很多应用场景和模式需要处理,于是团队开发了连接器框架。


Cloudflare 的 Kafka 之旅:万亿级消息处理实践


图 6 连接器框架

 

这个框架基于 Kafka 连接器,工程师们可以用它创建从一个系统读取数据并将其推送到另一个系统(如 Kafka 或Quicksilver——Cloudflare 的 Edge 数据库)的服务。为了简化使用过程,我们使用Cookiecutter来创建服务模板,工程师们只需要在命令行上输入一些参数。

 

连接器的配置过程很简单,可以通过环境变量来设置,无需修改代码。

 

下面的示例将从 Kafka 读取数据并将其写入 Quicksilver。连接器被设置为从 topic1 和 topic2 读取数据,并应用 pf_edge 函数。完整的配置就是这些,其中还包括指标、警报和部署到生产环境所需的其他所有东西。工程团队可以选择注册自定义转换器,这些是他们唯一需要编写的代码。


Cloudflare 的 Kafka 之旅:万亿级消息处理实践


图 7 连接器示例

 

例如,我们在通信方式首选项服务中使用了连接器:如果用户希望在 Cloudflare 仪表盘中移除营销信息,他们可以通过与这个服务发生交互来完成这个操作。通信方式首选项变更事件被保存在数据库中,并向 Kafka 发送消息。为了确保变更可以在三个不同的源系统中生效,我们使用单独的连接器分别将变更同步到事务性电子邮件服务、客户管理系统和市场电子邮件系统。这种方法可以让系统最终保持一致,我们利用 Kafka 提供的保证机制来确保整个过程顺利进行。


Cloudflare 的 Kafka 之旅:万亿级消息处理实践


图 8 连接器和通信方式首选项变更

 

可见性

 

在疫情期间,随着用户的迅速增长,吞吐量也在快速增长,我们创建的一些抽象的可伸缩性问题就凸显了出来。

 

以我们为 Kafka 用户处理的审计日志为例:我们建立了一个系统来管理这些日志,生产者生成事件,我们监听它们,并将数据记录到数据库中。


Cloudflare 的 Kafka 之旅:万亿级消息处理实践


图 9 审计日志推送

 

我们通过一个 API 和日志推送集成方式来暴露这些信息,我们通过日志推送将审计日志推送到不同的数据桶中,比如Cloudflare R2或 Amazon S3。

 

在疫情期间,新注册的审计日志比以往更多,用户开始使用我们的 API 来获取最新数据。由于这种方法不可伸缩,我们决定开发一个管道来解决这个问题。我们创建了一个小型服务来监听审计日志事件,并将其转换为适当的格式,然后直接存储到数据桶中,而不会让 API 过载。

 

随着日志越积累越多,并且无法足够快地清除它们,我们遇到了更大的问题,导致出现延迟和违反 SLA。我们无法确切知道导致延迟的原因,因为我们的 SDK 缺少诊断问题的工具:瓶颈在于从 Kafka 读取数据、转换数据还是将数据保存到数据库?


Cloudflare 的 Kafka 之旅:万亿级消息处理实践


图 10 瓶颈在哪里?

 

我们决定使用 Prometheus 指标来解决这个问题。直方图指标可以知道处理消息的每个步骤需要花费多少时间,这有助于我们识别出较慢的步骤,但我们仍然无法判断哪个组件处理特定消息花费的时间更长。为了解决这个问题,我们对 OpenTelemetry 进行了调研,重点关注它的跟踪集成能力:Kafka 方面并没有很多好的 OpenTracing 集成工具,而且跨多个服务传播跟踪事件也是件具有挑战性的事情。

 

随着团队使用 OpenTracing 来增强 SDK,我们发现将数据推送到数据桶和从 Kafka 读取数据都是瓶颈,并优先修复了这些问题。


Cloudflare 的 Kafka 之旅:万亿级消息处理实践


图 11 找出瓶颈所在

 

将指标添加到 SDK 中,我们能够更好地了解集群和服务的健康状况。

 

通知噪音

 

我们遇到了一个问题:因为收集了大量指标,出现了通知噪音,其中有许多与不健康的应用程序和延迟问题相关的警报。


Cloudflare 的 Kafka 之旅:万亿级消息处理实践


图 12 警报管道

 

基本的警报管道由 Prometheus 和 AlertManager 组成,它们会将警报转到 PagerDuty。由于服务的重启或伸缩并不是很理想,所以我们决定使用 Kubernetes,并实现服务的健康检查。

 

Kubernetes 中有三种类型的健康检查:活动、准备就绪和启动。对于 Kafka 来说,准备就绪探测没有用,因为通常情况下,HTTP 服务器是不公开的。为了解决这个问题,我们实现了另一种方法。


Cloudflare 的 Kafka 之旅:万亿级消息处理实践


图 13 健康检查和 Kafka

 

在收到活动检查请求时,我们让一个 broker 执行一个基本操作,例如列出主题,如果响应成功,则检查通过。然而,在某些情况下,应用程序虽然在运行,但无法生产或消费消息,于是我们为消费者实现了更智能的健康状况检查。


Cloudflare 的 Kafka 之旅:万亿级消息处理实践


图 14 健康检查的实现

 

Kafka 的当前偏移量就是分区最后一个可用的偏移量,而提交的偏移量是消费者成功消费的最后一个偏移量。

 

我们在进行健康状况检查时读取这些偏移量,这样就确定消费者运行是否正确:如果无法读取偏移量,则可能出现了问题,并且将消费者报告为不健康。如果可读取偏移量,我们将最后提交的偏移量与当前偏移量进行比较。如果它们相同,则没有追加的新消息,那么我们就认为消费者是健康的。如果最后一次提交的偏移量不同,我们就检查它是否与之前记录的最后一次提交的偏移量相同,这样可以知道消费者是否卡住了并需要重启。这个过程带来了更好的通知体验和用户体验。

 

消费延迟

 

我们有一个为电子邮件系统生成 Kafka 事件的系统。这些事件包含了一个模板,例如,一个“受攻击”模板,其中包含了受攻击网站的信息和攻击者的身份,以及元数据。

 

我们监听这些事件,从注册中心获取电子邮件模板,对其进行填充,并将其发送给客户。然而,我们开始遇到负载问题:生产事件的速率出现了峰值,导致了消费延迟,影响到了重要的 OTP 和 SLO。


Cloudflare 的 Kafka 之旅:万亿级消息处理实践


图 15 消费延迟

 

批次处理

 

我们开始研究通过不同的解决方案来解决这个问题,最初的解决方案是扩展分区和消费者的数量,但并没有获得显著的改进。



图 16 批次处理

 

我们决定使用一种更简单但更有效的方法——批次处理,即一次处理一定数量的消息、执行转换和分发。这种方式被证明是有效的,让团队可以轻松地应对高生产速率。

 

文档化

 

在开发 SDK 时,我们发现许多开发人员在使用时会遇到问题。有些人发现有 bug,有些人不确定如何实现某些功能或不知道一些错误是什么意思。为了解决这个问题,我们在 Google Chat 上创建了频道,让他们来问我们问题。我们有一个值班的人负责回复问题,并在我们的维基页上记录我们发现的问题和答案。这有助于改善 SDK 的整体用户体验。

 

结论

 

我们总结了四个教训:

 

  • 始终在灵活性和简单性之间找到恰当的平衡:可配置的设置步骤提供了更大的灵活性,而更简单的设置有助于进行跨不同管道的标准化。

  • 可见性:尽早将指标添加到 SDK 中,这样可以帮助团队了解系统的行为并做出更好的决策,特别是在发生故障时。

  • 契约:建立强大的、严格的契约,让人们能够很好地了解主题内部发生了什么,知道谁在写入和读取消息。

  • 将所做的事情记录下来,这样就不需要花时间答疑解惑或帮助人们调试问题。这可以通过 Google Chat 和维基页等方式来实现。

 

我们遵循这些原则,改进了我们的系统,让我们的客户感到满意,即使是在面临高度压力的情况下。

链接:https://www.infoq.cn/article/ZHFZHsctlAhN8Ai9wdpo

(版权归原作者所有,侵删)


标签

发表评论

苏ICP备2023047577号-1