如何在Debian10上安装ApacheKafka

来自菜鸟教程
跳转至:导航、​搜索

作为 Write for DOnations 计划的一部分,作者选择了 Free and Open Source Fund 来接受捐赠。

介绍

Apache Kafka 是一种流行的分布式消息代理,旨在处理大量实时数据。 与 ActiveMQRabbitMQ 等其他消息代理相比,Kafka 集群具有高度可扩展性和容错性,并且具有更高的吞吐量。 虽然它通常用作 发布/订阅 消息传递系统,但许多组织也将它用于日志聚合,因为它为发布的消息提供持久存储。

发布/订阅消息系统允许一个或多个生产者发布消息,而无需考虑消费者的数量或他们将如何处理消息。 订阅的客户端会自动收到有关更新和新消息创建的通知。 与客户端定期轮询以确定是否有新消息可用的系统相比,该系统更有效且可扩展。

在本教程中,您将在 Debian 10 服务器上安全地安装和配置 Apache Kafka 2.1.1,然后通过生成和使用 Hello World 消息来测试您的设置。 然后,您可以选择安装 KafkaT 来监控 Kafka 并设置 Kafka 多节点集群。

先决条件

要继续进行,您将需要:

  • 一台至少有 4GB RAM 的 Debian 10 服务器和一个具有 sudo 权限的非 root 用户。 如果您没有设置非 root 用户,请按照我们的 Debian 10 初始服务器设置指南中指定的步骤进行操作。
  • OpenJDK 11 安装在您的服务器上。 要安装此版本,请按照 How To Install Java with Apt on Debian 10 中关于安装特定版本的 OpenJDK 的说明进行操作。 Kafka 是用 Java 编写的,因此需要 JVM。

注意: 没有 4GB RAM 的安装可能会导致 Kafka 服务失败,Java 虚拟机 (JVM) 在启动期间抛出 Out Of Memory 异常。


第 1 步 — 为 Kafka 创建用户

由于 Kafka 可以通过网络处理请求,因此为它创建一个专用用户是最佳实践。 如果 Kafka 服务器受到威胁,这可以最大限度地减少对 Debian 机器的损坏。 您将在此步骤中创建专用用户 kafka

以非 root sudo 用户身份登录,使用 useradd 命令创建一个名为 kafka 的用户:

sudo useradd kafka -m

-m 标志确保将为用户创建主目录。 这个主目录 /home/kafka 将充当您稍后执行命令的工作区目录。

使用 passwd 设置密码:

sudo passwd kafka

输入您希望用于该用户的密码。

接下来,使用 adduser 命令将 kafka 用户添加到 sudo 组,使其具有安装 Kafka 依赖项所需的权限:

sudo adduser kafka sudo

您的 kafka 用户现已准备就绪。 使用 su 登录此帐户:

su -l kafka

现在您已经创建了特定于 Kafka 的用户,您可以继续下载和提取 Kafka 二进制文件。

第 2 步 — 下载和提取 Kafka 二进制文件

在此步骤中,您将下载 Kafka 二进制文件并将其解压缩到 kafka 用户主目录中的专用文件夹中。

首先,在 /home/kafka 中创建一个名为 Downloads 的目录来存储您的下载:

mkdir ~/Downloads

接下来,使用 apt-get 安装 curl 以便您能够下载远程文件:

sudo apt-get update && sudo apt-get install curl

出现提示时,键入 Y 以确认 curl 下载。

安装 curl 后,使用它下载 Kafka 二进制文件:

curl "https://archive.apache.org/dist/kafka/2.1.1/kafka_2.11-2.1.1.tgz" -o ~/Downloads/kafka.tgz

创建一个名为 kafka 的目录并切换到该目录。 这将是 Kafka 安装的基本目录:

mkdir ~/kafka && cd ~/kafka

使用 tar 命令提取您下载的存档:

tar -xvzf ~/Downloads/kafka.tgz --strip 1

您指定了 --strip 1 标志以确保存档的内容提取到 ~/kafka/ 本身,而不是在其中的另一个目录中,例如 ~/kafka/kafka_2.12-2.1.1/

现在您已经成功下载并提取了二进制文件,您可以继续配置 Kafka 以允许删除主题。

第 3 步 — 配置 Kafka 服务器

Kafka 的默认行为不允许我们删除 topic、可以向其发布消息的类别、组或提要名称。 要修改它,您将编辑配置文件。

Kafka的配置选项在server.properties中指定。 使用 nano 或您喜欢的编辑器打开此文件:

nano ~/kafka/config/server.properties

让我们添加一个允许我们删除 Kafka 主题的设置。 将以下突出显示的行添加到文件底部:

~/kafka/config/server.properties

...
group.initial.rebalance.delay.ms

delete.topic.enable = true

保存文件,退出nano。 现在您已经配置了 Kafka,您可以创建 systemd 单元文件以在启动时运行和启用 Kafka。

第 4 步 - 创建 Systemd 单元文件并启动 Kafka 服务器

在本节中,您将为 Kafka 服务创建 systemd 单元文件。 这将帮助您以与其他 Linux 服务一致的方式执行常见的服务操作,例如启动、停止和重新启动 Kafka。

ZooKeeper 是 Kafka 用来管理其集群状态和配置的服务。 它通常在分布式系统中作为一个组成部分使用。 在本教程中,您将使用 Zookeeper 来管理 Kafka 的这些方面。 如果您想了解更多信息,请访问官方 ZooKeeper 文档

首先,为 zookeeper 创建单元文件:

sudo nano /etc/systemd/system/zookeeper.service

在文件中输入以下单位定义:

/etc/systemd/system/zookeeper.service

[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
User=kafka
ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

[Unit] 部分指定 ZooKeeper 需要网络,并且文件系统在启动之前准备好。

[Service] 部分指定 systemd 应该使用 zookeeper-server-start.shzookeeper-server-stop.sh shell 文件来启动和停止服务。 它还指定了如果 ZooKeeper 异常退出应该自动重启。

接下来,为 kafka 创建 systemd 服务文件:

sudo nano /etc/systemd/system/kafka.service

在文件中输入以下单位定义:

/etc/systemd/system/kafka.service

[Unit]
Requires=zookeeper.service
After=zookeeper.service

[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

[Unit] 部分指定此单元文件依赖于 zookeeper.service。 这将确保 zookeeperkafka 服务启动时自动启动。

[Service] 部分指定 systemd 应该使用 kafka-server-start.shkafka-server-stop.sh shell 文件来启动和停止服务。 它还指定了如果Kafka异常退出应该自动重启。

现在已经定义了单位,使用以下命令启动 Kafka:

sudo systemctl start kafka

为确保服务器已成功启动,请检查 kafka 单元的日志日志:

sudo journalctl -u kafka

您将看到类似于以下内容的输出:

OutputMar 23 13:31:48 kafka systemd[1]: Started kafka.service.

您现在有一个 Kafka 服务器正在侦听端口 9092,这是 Kafka 的默认端口。

您已经启动了 kafka 服务,但是如果您要重新启动服务器,它还不会自动启动。 要在服务器启动时启用 kafka,请运行:

sudo systemctl enable kafka

现在您已经启动并启用了服务,是时候检查安装了。

第 5 步 — 测试安装

让我们发布并使用 Hello World 消息,以确保 Kafka 服务器的行为正确。 在 Kafka 中发布消息需要:

  • 一个 producer,可以将记录和数据发布到主题。
  • 一个 consumer,它从主题中读取消息和数据。

首先,通过键入以下内容创建一个名为 TutorialTopic 的主题:

~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic

您可以使用 kafka-console-producer.sh 脚本从命令行创建生产者。 它需要 Kafka 服务器的主机名、端口和主题名称作为参数。

通过键入以下内容将字符串 Hello, World 发布到 TutorialTopic 主题:

echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null

--broker-list 标志确定将消息发送到的消息代理列表,在本例中为 localhost:9092--topic 将主题指定为 TutorialTopic

接下来,您可以使用 kafka-console-consumer.sh 脚本创建 Kafka 消费者。 它需要 ZooKeeper 服务器的主机名和端口以及主题名称作为参数。

以下命令使用来自 TutorialTopic 的消息。 注意 --from-beginning 标志的使用,它允许消费在消费者启动之前发布的消息:

~/kafka/bin/kafka-console-consumer.sh --bootstrap-server `localhost:9092` --topic TutorialTopic --from-beginning

--bootstrap-server 提供进入 Kafka 集群的入口列表。 在这种情况下,您使用的是 localhost:9092

您将在终端中看到 Hello, World

OutputHello, World

该脚本将继续运行,等待更多消息发布到该主题。 随意打开一个新终端并启动一个生产者来发布更多消息。 您应该能够在消费者的输出中看到它们。 如果您想了解更多关于如何使用 Kafka 的信息,请参阅 Kafka 官方文档

完成测试后,按 CTRL+C 停止消费者脚本。 现在您已经测试了安装,您可以继续安装 KafkaT 以更好地管理您的 Kafka 集群。

第 6 步 — 安装 KafkaT(可选)

KafkaT 是 Airbnb 的一款工具,可让您更轻松地查看有关 Kafka 集群的详细信息并从命令行执行某些管理任务。 因为它是一个 Ruby gem,你需要 Ruby 来使用它。 您还需要 build-essential 包才能构建它所依赖的其他 gem。 使用 apt 安装它们:

sudo apt install ruby ruby-dev build-essential

您现在可以使用 gem 命令安装 KafkaT:

sudo CFLAGS=-Wno-error=format-overflow gem install kafkat

CFLAGS=-Wno-error=format-overflow 选项禁用格式溢出警告,并且是 ZooKeeper gem 所必需的,它是 KafkaT 的依赖项。

KafkaT 使用 .kafkatcfg 作为配置文件来确定您的 Kafka 服务器的安装和日志目录。 它还应该有一个将 KafkaT 指向 ZooKeeper 实例的条目。

创建一个名为 .kafkatcfg 的新文件:

nano ~/.kafkatcfg

添加以下行以指定有关 Kafka 服务器和 Zookeeper 实例的所需信息:

~/.kafkatcfg

{
  "kafka_path": "~/kafka",
  "log_path": "/tmp/kafka-logs",
  "zk_path": "localhost:2181"
}

您现在可以使用 KafkaT。 首先,您可以使用它来查看有关所有 Kafka 分区的详细信息:

kafkat partitions

您将看到以下输出:

OutputTopic                 Partition   Leader      Replicas        ISRs    
TutorialTopic         0             0         [0]             [0]
__consumer_offsets    0             0         [0]             [0]
...

此输出显示 TutorialTopic 以及 __consumer_offsets,这是 Kafka 用于存储客户端相关信息的内部主题。 您可以放心地忽略以 __consumer_offsets 开头的行。

要了解有关 KafkaT 的更多信息,请参阅其 GitHub 存储库

现在您已经安装了 KafkaT,您可以选择在 Debian 10 服务器集群上设置 Kafka 以创建多节点集群。

第 7 步 — 设置多节点集群(可选)

如果您想使用更多 Debian 10 服务器创建多代理集群,请在每台新机器上重复步骤 1、步骤 4 和步骤 5。 此外,在 ~/kafka/config/server.properties 文件中对每个进行以下更改:

  • 更改 broker.id 属性的值,使其在整个集群中是唯一的。 此属性唯一标识集群中的每个服务器,并且可以将任何字符串作为其值。 例如,"server1""server2" 等可用作标识符。
  • 更改 zookeeper.connect 属性的值,使所有节点都指向同一个 ZooKeeper 实例。 此属性指定 ZooKeeper 实例的地址并遵循 <HOSTNAME/IP_ADDRESS>:<PORT> 格式。 对于本教程,您将使用 your_first_server_IP:2181,将 your_first_server_IP 替换为您已设置的 Debian 10 服务器的 IP 地址。

如果您希望集群有多个 ZooKeeper 实例,则每个节点上的 zookeeper.connect 属性的值应该是相同的逗号分隔字符串,列出所有 ZooKeeper 实例的 IP 地址和端口号。

注意: 如果您在安装了 Zookeeper 的 Debian 10 服务器上激活了防火墙,请确保打开端口 2181 以允许来自集群中其他节点的传入请求。


第 8 步——限制 Kafka 用户

现在所有安装都已完成,您可以删除 kafka 用户的管理员权限。 在您这样做之前,请注销并以任何其他非 root sudo 用户身份重新登录。 如果您仍在运行本教程开始时使用的同一 shell 会话,只需键入 exit

从 sudo 组中删除 kafka 用户:

sudo deluser kafka sudo

为了进一步提高您的 Kafka 服务器的安全性,请使用 passwd 命令锁定 kafka 用户的密码。 这确保没有人可以使用此帐户直接登录服务器:

sudo passwd kafka -l

此时,只有 root 或 sudo 用户可以通过键入以下命令以 kafka 身份登录:

sudo su - kafka

以后如果要解锁,请使用 passwd-u 选项:

sudo passwd kafka -u

您现在已成功限制 kafka 用户的管理员权限。

结论

现在,Apache Kafka 在您的 Debian 服务器上安全运行。 您可以通过使用 Kafka 客户端 创建 Kafka 生产者和消费者来在您的项目中使用它,这些客户端可用于大多数编程语言。 要了解更多关于 Kafka 的信息,您还可以查阅 Apache Kafka 文档