如何在CentOS7上安装ApacheKafka
作为 Write for DOnations 计划的一部分,作者选择了 Free and Open Source Fund 来接受捐赠。
介绍
Apache Kafka 是一种流行的分布式消息代理,旨在高效处理大量实时数据。 Kafka 集群不仅具有高度可扩展性和容错性,而且与 ActiveMQ 和 RabbitMQ 等其他消息代理相比,它还具有更高的吞吐量。 虽然它通常用作 发布/订阅 消息传递系统,但许多组织也将它用于日志聚合,因为它为发布的消息提供持久存储。
发布/订阅消息系统允许一个或多个生产者发布消息,而无需考虑消费者的数量或他们将如何处理消息。 订阅的客户端会自动收到有关更新和新消息创建的通知。 与客户端定期轮询以确定是否有新消息可用的系统相比,该系统更有效且可扩展。
在本教程中,您将在 CentOS 7 上安装和使用 Apache Kafka 2.1.1。
先决条件
要继续进行,您将需要:
- 一台 CentOS 7 服务器和一个具有 sudo 权限的非 root 用户。 如果您没有设置非 root 用户,请按照本 指南 中指定的步骤进行操作。
- 服务器上至少有 4GB 的 RAM。 没有这么多 RAM 的安装可能会导致 Kafka 服务失败,Java 虚拟机 (JVM) 在启动期间抛出“内存不足”异常。
- OpenJDK 8 安装在您的服务器上。 要安装此版本,请按照 这些说明 安装特定版本的 OpenJDK。 Kafka 是用 Java 编写的,因此需要 JVM; 但是,它的启动 shell 脚本有一个版本检测错误,导致它无法以 8 以上的 JVM 版本启动。
第 1 步 — 为 Kafka 创建用户
由于 Kafka 可以通过网络处理请求,因此您应该为其创建一个专用用户。 如果 Kafka 服务器受到威胁,这可以最大限度地减少对 CentOS 机器的损害。 我们将在此步骤中创建一个专用的 kafka 用户,但是一旦您完成了 Kafka 的设置,您应该创建一个不同的非 root 用户来在此服务器上执行其他任务。
以非 root sudo 用户身份登录,使用 useradd
命令创建一个名为 kafka 的用户:
sudo useradd kafka -m
-m
标志确保将为用户创建主目录。 这个主目录 /home/kafka
将作为我们的工作区目录,用于执行以下部分中的命令。
使用 passwd
设置密码:
sudo passwd kafka
使用 adduser
命令将 kafka 用户添加到 wheel
组,使其具有安装 Kafka 依赖项所需的权限:
sudo usermod -aG wheel kafka
您的 kafka 用户现已准备就绪。 使用 su
登录此帐户:
su -l kafka
现在我们已经创建了特定于 Kafka 的用户,我们可以继续下载和提取 Kafka 二进制文件。
第 2 步 — 下载和提取 Kafka 二进制文件
让我们下载 Kafka 二进制文件并将其解压缩到 kafka 用户主目录中的专用文件夹中。
首先,在 /home/kafka
中创建一个名为 Downloads
的目录来存储您的下载:
mkdir ~/Downloads
使用 curl
下载 Kafka 二进制文件:
curl "https://www.apache.org/dist/kafka/2.1.1/kafka_2.11-2.1.1.tgz" -o ~/Downloads/kafka.tgz
创建一个名为 kafka
的目录并切换到该目录。 这将是 Kafka 安装的基本目录:
mkdir ~/kafka && cd ~/kafka
使用 tar
命令提取您下载的存档:
tar -xvzf ~/Downloads/kafka.tgz --strip 1
我们指定 --strip 1
标志以确保存档的内容提取到 ~/kafka/
本身,而不是在其中的另一个目录(例如 ~/kafka/kafka_2.11-2.1.1/
)中。
现在我们已经成功下载并提取了二进制文件,我们可以继续配置到 Kafka 以允许删除主题。
第 3 步 — 配置 Kafka 服务器
Kafka 的默认行为不允许我们删除 topic、可以向其发布消息的类别、组或提要名称。 要修改它,让我们编辑配置文件。
Kafka的配置选项在server.properties
中指定。 使用 vi
或您喜欢的编辑器打开此文件:
vi ~/kafka/config/server.properties
让我们添加一个允许我们删除 Kafka 主题的设置。 按 i
插入文本,并将以下内容添加到文件底部:
~/kafka/config/server.properties
delete.topic.enable = true
完成后,按 ESC
退出插入模式,按 :wq
将更改写入文件并退出。 现在我们已经配置了 Kafka,我们可以继续创建 systemd 单元文件以在启动时运行和启用它。
第 4 步 - 创建 Systemd 单元文件并启动 Kafka 服务器
在本节中,我们将为 Kafka 服务创建 systemd 单元文件 。 这将帮助我们以与其他 Linux 服务一致的方式执行常见的服务操作,例如启动、停止和重新启动 Kafka。
Zookeeper 是 Kafka 用来管理其集群状态和配置的服务。 它通常在许多分布式系统中作为一个集成组件使用。 如果您想了解更多信息,请访问官方 Zookeeper 文档。
为 zookeeper
创建单元文件:
sudo vi /etc/systemd/system/zookeeper.service
在文件中输入以下单位定义:
/etc/systemd/system/zookeeper.service
[Unit] Requires=network.target remote-fs.target After=network.target remote-fs.target [Service] Type=simple User=kafka ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh Restart=on-abnormal [Install] WantedBy=multi-user.target
[Unit]
部分指定 Zookeeper 需要网络和文件系统准备好才能启动。
[Service]
部分指定 systemd 应该使用 zookeeper-server-start.sh
和 zookeeper-server-stop.sh
shell 文件来启动和停止服务。 它还指定了如果 Zookeeper 异常退出应该自动重启。
完成编辑后保存并关闭文件。
接下来,为 kafka
创建 systemd 服务文件:
sudo vi /etc/systemd/system/kafka.service
在文件中输入以下单位定义:
/etc/systemd/system/kafka.service
[Unit] Requires=zookeeper.service After=zookeeper.service [Service] Type=simple User=kafka ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1' ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh Restart=on-abnormal [Install] WantedBy=multi-user.target
[Unit]
部分指定此单元文件依赖于 zookeeper.service
。 这将确保 zookeeper
在 kafa
服务启动时自动启动。
[Service]
部分指定 systemd 应该使用 kafka-server-start.sh
和 kafka-server-stop.sh
shell 文件来启动和停止服务。 它还指定了如果Kafka异常退出应该自动重启。
完成编辑后保存并关闭文件。
现在已经定义了单位,使用以下命令启动 Kafka:
sudo systemctl start kafka
为确保服务器已成功启动,请检查 kafka
单元的日志日志:
journalctl -u kafka
您应该会看到类似于以下内容的输出:
OutputJul 17 18:38:59 kafka-centos systemd[1]: Started kafka.service.
您现在有一个 Kafka 服务器正在侦听端口 9092
。
虽然我们已经启动了 kafka
服务,但如果我们要重新启动我们的服务器,它不会自动启动。 要在服务器启动时启用 kafka
,请运行:
sudo systemctl enable kafka
现在我们已经启动并启用了服务,让我们检查安装。
第 5 步 — 测试安装
让我们发布并使用 “Hello World” 消息,以确保 Kafka 服务器正常运行。 在 Kafka 中发布消息需要:
- 一个 producer,可以将记录和数据发布到主题。
- 一个 consumer,它从主题中读取消息和数据。
首先,通过键入以下内容创建一个名为 TutorialTopic
的主题:
~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic
您将看到以下输出:
OutputCreated topic "TutorialTopic".
您可以使用 kafka-console-producer.sh
脚本从命令行创建生产者。 它需要 Kafka 服务器的主机名、端口和主题名称作为参数。
通过键入以下内容将字符串 "Hello, World"
发布到 TutorialTopic
主题:
echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null
接下来,您可以使用 kafka-console-consumer.sh
脚本创建 Kafka 消费者。 它需要 ZooKeeper 服务器的主机名和端口,以及主题名称作为参数。
以下命令使用来自 TutorialTopic
的消息。 注意 --from-beginning
标志的使用,它允许消费在消费者启动之前发布的消息:
~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning
如果没有配置问题,您应该在终端中看到 Hello, World
:
OutputHello, World
该脚本将继续运行,等待更多消息发布到该主题。 随意打开一个新终端并启动一个生产者来发布更多消息。 您应该能够在消费者的输出中看到它们。
完成测试后,按 CTRL+C
停止消费者脚本。 现在我们已经测试了安装,让我们继续安装 KafkaT。
第 6 步 — 安装 KafkaT(可选)
KafkaT 是 Airbnb 的一款工具,可让您更轻松地查看有关 Kafka 集群的详细信息并从命令行执行某些管理任务。 因为它是一个 Ruby gem,所以您需要 Ruby 才能使用它。 您还需要 ruby-devel
和与构建相关的包,例如 make
和 gcc
才能构建它所依赖的其他 gem。 使用 yum
安装它们:
sudo yum install ruby ruby-devel make gcc patch
您现在可以使用 gem 命令安装 KafkaT:
sudo gem install kafkat
KafkaT 使用 .kafkatcfg
作为配置文件来确定您的 Kafka 服务器的安装和日志目录。 它还应该有一个将 KafkaT 指向 ZooKeeper 实例的条目。
创建一个名为 .kafkatcfg
的新文件:
vi ~/.kafkatcfg
添加以下行以指定有关 Kafka 服务器和 Zookeeper 实例的所需信息:
~/.kafkatcfg
{ "kafka_path": "~/kafka", "log_path": "/tmp/kafka-logs", "zk_path": "localhost:2181" }
完成编辑后保存并关闭文件。
您现在可以使用 KafkaT。 首先,您可以使用它来查看有关所有 Kafka 分区的详细信息:
kafkat partitions
您将看到以下输出:
OutputTopic Partition Leader Replicas ISRs TutorialTopic 0 0 [0] [0] __consumer_offsets 0 0 [0] [0] ... ...
您将看到 TutorialTopic
以及 __consumer_offsets
,这是 Kafka 用于存储客户端相关信息的内部主题。 您可以放心地忽略以 __consumer_offsets
开头的行。
要了解有关 KafkaT 的更多信息,请参阅其 GitHub 存储库。
第 7 步 — 设置多节点集群(可选)
如果要使用更多 CentOS 7 机器创建多代理集群,则应在每台新机器上重复步骤 1、步骤 4 和步骤 5。 此外,您应该在 server.properties
文件中对每个进行以下更改:
- 应该更改
broker.id
属性的值,使其在整个集群中是唯一的。 此属性唯一标识集群中的每个服务器,并且可以将任何字符串作为其值。 例如,"server1"
、"server2"
等。 - 应该更改
zookeeper.connect
属性的值,以使所有节点都指向同一个 ZooKeeper 实例。 此属性指定 Zookeeper 实例的地址并遵循<HOSTNAME/IP_ADDRESS>:<PORT>
格式。 例如,"203.0.113.0:2181"
、"203.0.113.1:2181"
等。
如果您希望集群有多个 ZooKeeper 实例,则每个节点上的 zookeeper.connect
属性的值应该是相同的逗号分隔字符串,列出所有 ZooKeeper 实例的 IP 地址和端口号。
第 8 步——限制 Kafka 用户
现在所有安装都已完成,您可以删除 kafka 用户的管理员权限。 在您这样做之前,请注销并以任何其他非 root sudo 用户身份重新登录。 如果您仍在运行本教程开始时使用的同一 shell 会话,只需键入 exit
。
从 sudo 组中删除 kafka 用户:
sudo gpasswd -d kafka wheel
为了进一步提高 Kafka 服务器的安全性,请使用 passwd
命令锁定 kafka 用户的密码。 这确保没有人可以使用此帐户直接登录服务器:
sudo passwd kafka -l
此时,只有 root 或 sudo
用户可以通过键入以下命令以 kafka
身份登录:
sudo su - kafka
以后如果要解锁,请使用 passwd
和 -u
选项:
sudo passwd kafka -u
您现在已成功限制 kafka 用户的管理员权限。
结论
现在,Apache Kafka 在 CentOS 服务器上安全运行。 您可以通过使用 Kafka 客户端 创建 Kafka 生产者和消费者来在您的项目中使用它,这些客户端可用于大多数编程语言。 要了解更多关于 Kafka 的信息,您还可以查阅其 文档。