如何在CentOS7上备份、导入和迁移ApacheKafka数据
作为 Write for DOnations 计划的一部分,作者选择了 Tech Education Fund 来接受捐赠。
介绍
备份您的 Apache Kafka 数据是一项重要的实践,可帮助您从意外的数据丢失或由于用户错误而添加到集群的不良数据中恢复。 集群和主题数据的数据转储是执行备份和恢复的有效方式。
当您的 Kafka 实例由于服务器硬件或网络故障而变得不可用并且您需要使用旧数据创建新的 Kafka 实例时,将备份的数据导入并迁移到单独的服务器会很有帮助。 由于资源使用情况发生变化,当您将 Kafka 实例移动到升级或降级的服务器时,导入和迁移备份数据也很有用。
在本教程中,您将在单个 CentOS 7 安装以及不同服务器上的多个 CentOS 7 安装上备份、导入和迁移 Kafka 数据。 ZooKeeper 是 Kafka 运行的关键组件。 它存储有关集群状态的信息,例如消费者数据、分区数据和集群中其他代理的状态。 因此,您还将在本教程中备份 ZooKeeper 的数据。
先决条件
要继续进行,您将需要:
- 按照 本教程 设置的具有至少 4GB RAM 和非 root sudo 用户的 CentOS 7 服务器。
- 安装了 Apache Kafka 的 CentOS 7 服务器,用作备份源。 如果源服务器上尚未安装 Kafka,请按照 如何在 CentOS 7 上安装 Apache Kafka 指南设置您的 Kafka 安装。
- OpenJDK 8 安装在服务器上。 要安装此版本,请按照这些 说明 安装特定版本的 OpenJDK。
- 第 7 步可选 — 另一个安装了 Apache Kafka 的 CentOS 7 服务器,作为备份的目的地。 按照上一个先决条件中的文章链接在目标服务器上安装 Kafka。 仅当您将 Kafka 数据从一台服务器移动到另一台服务器时,才需要此先决条件。 如果要将 Kafka 数据备份并导入到单个服务器,则可以跳过此先决条件。
第 1 步 — 创建测试主题并添加消息
Kafka 消息 是 Kafka 中最基本的数据存储单元,是您将向 Kafka 发布和订阅的实体。 Kafka topic 就像是一组相关消息的容器。 当您订阅特定主题时,您将仅收到发布到该特定主题的消息。 在本节中,您将登录到要备份的服务器(源服务器)并添加 Kafka 主题和消息,以便为备份填充一些数据。
本教程假设您已将 Kafka 安装在 kafka 用户 (/home/kafka/kafka
) 的主目录中。 如果您的安装位于不同的目录中,请使用您的 Kafka 安装路径以及本教程其余部分的命令修改以下命令中的 ~/kafka
部分。
通过执行以下命令通过 SSH 连接到源服务器:
ssh sammy@source_server_ip
运行以下命令以 kafka 用户登录:
sudo -iu kafka
使用 Kafka 安装的 bin 目录中的 kafka-topics.sh
shell 实用程序文件创建一个名为 BackupTopic
的主题,方法是键入:
~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic BackupTopic
使用 ~/kafka/bin/kafka-console-producer.sh
shell 实用程序脚本将字符串 "Test Message 1"
发布到 BackupTopic
主题。
如果您想在此处添加其他消息,您现在可以这样做。
echo "Test Message 1" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic BackupTopic > /dev/null
~/kafka/bin/kafka-console-producer.sh
文件允许您直接从命令行发布消息。 通常,您会在程序中使用 Kafka 客户端库发布消息,但由于这涉及针对不同编程语言的不同设置,因此您可以使用 shell 脚本作为一种独立于语言的方式在测试期间或执行管理任务时发布消息。 --topic
标志指定您将发布消息的主题。
接下来,通过运行以下命令验证 kafka-console-producer.sh
脚本是否已发布消息:
~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic BackupTopic --from-beginning
~/kafka/bin/kafka-console-consumer.sh
shell 脚本启动消费者。 一旦启动,它将订阅来自您在上一个命令中的 "Test Message 1"
消息中发布的主题的消息。 命令中的 --from-beginning
标志允许消费在消费者启动之前发布的消息。 如果未启用该标志,则只会出现在消费者启动后发布的消息。 运行命令时,您将在终端中看到以下输出:
OutputTest Message 1
按 CTRL+C
停止消费者。
您已经创建了一些测试数据并验证了它是持久的。 现在您可以在下一节中备份状态数据。
第 2 步 — 备份 ZooKeeper 状态数据
在备份实际的 Kafka 数据之前,您需要备份存储在 ZooKeeper 中的集群状态。
ZooKeeper 将其数据存储在 ~/kafka/config/zookeeper.properties
配置文件中 dataDir
字段指定的目录中。 您需要读取该字段的值以确定要备份的目录。 默认情况下,dataDir
指向/tmp/zookeeper
目录。 如果您的安装中的值不同,请在以下命令中将 /tmp/zookeeper
替换为该值。
以下是 ~/kafka/config/zookeeper.properties
文件的示例输出:
~/kafka/config/zookeeper.properties
... ... ... # the directory where the snapshot is stored. dataDir=/tmp/zookeeper # the port at which the clients will connect clientPort=2181 # disable the per-ip limit on the number of connections since this is a non-production config maxClientCnxns=0 ... ... ...
现在您有了目录的路径,您可以创建其内容的压缩存档文件。 压缩存档文件是比常规存档文件更好的选择,可以节省磁盘空间。 运行以下命令:
tar -czf /home/kafka/zookeeper-backup.tar.gz /tmp/zookeeper/*
该命令的输出 tar: Removing leading / from member names
您可以放心地忽略。
-c
和 -z
标志告诉 tar
创建存档并对存档应用 gzip 压缩。 -f
标志指定输出压缩归档文件的名称,在本例中为 zookeeper-backup.tar.gz
。
您可以在当前目录中运行 ls
以查看 zookeeper-backup.tar.gz
作为输出的一部分。
您现在已成功备份 ZooKeeper 数据。 在下一节中,您将备份实际的 Kafka 数据。
第 3 步 — 备份 Kafka 主题和消息
在本节中,您将像在上一步中为 ZooKeeper 所做的那样,将 Kafka 的数据目录备份到一个压缩的 tar 文件中。
Kafka 将主题、消息和内部文件存储在 ~/kafka/config/server.properties
配置文件中 log.dirs
字段指定的目录中。 您需要读取该字段的值以确定要备份的目录。 默认情况下,在您当前的安装中,log.dirs
指向 /tmp/kafka-logs
目录。 如果您安装的值不同,请将以下命令中的 /tmp/kafka-logs
替换为正确的值。
以下是 ~/kafka/config/server.properties
文件的示例输出:
~/kafka/config/server.properties
... ... ... ############################# Log Basics ############################# # A comma separated list of directories under which to store log files log.dirs=/tmp/kafka-logs # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=1 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. # This value is recommended to be increased for installations with data dirs located in RAID array. num.recovery.threads.per.data.dir=1 ... ... ...
首先,停止Kafka服务,使log.dirs
目录下的数据在用tar
创建归档时保持一致状态。 为此,请键入 exit
返回到服务器的非 root 用户,然后运行以下命令:
sudo systemctl stop kafka
停止 Kafka 服务后,以 kafka 用户身份重新登录:
sudo -iu kafka
有必要以非 root sudo 用户身份停止/启动 Kafka 和 ZooKeeper 服务,因为在 Apache Kafka 安装先决条件中,您将 kafka 用户限制为安全预防措施。 先决条件中的此步骤禁用 kafka 用户的 sudo 访问权限,从而导致命令无法执行。
现在,通过运行以下命令创建目录内容的压缩存档文件:
tar -czf /home/kafka/kafka-backup.tar.gz /tmp/kafka-logs/*
再一次,您可以放心地忽略命令的输出 (tar: Removing leading / from member names
)。
您可以在当前目录中运行 ls
以查看 kafka-backup.tar.gz
作为输出的一部分。
您可以再次启动 Kafka 服务 - 如果您不想立即恢复数据 - 通过键入 exit
切换到非 root sudo 用户,然后运行:
sudo systemctl start kafka
以您的 kafka 用户身份重新登录:
sudo -iu kafka
您已成功备份 Kafka 数据。 您现在可以继续下一部分,您将在其中恢复存储在 ZooKeeper 中的集群状态数据。
第 4 步 — 恢复 ZooKeeper 数据
在本节中,您将在用户执行创建主题、添加/删除额外节点以及添加和消费消息等操作时恢复 Kafka 在内部创建和管理的集群状态数据。 您将通过删除 ZooKeeper 数据目录并恢复 zookeeper-backup.tar.gz
文件的内容,将数据恢复到现有的源安装。 如果要将数据恢复到不同的服务器,请参见步骤 7。
您需要停止 Kafka 和 ZooKeeper 服务,以防数据目录在恢复过程中收到无效数据。
首先,通过键入 exit
来停止 Kafka 服务,以切换到您的非 root sudo 用户,然后运行:
sudo systemctl stop kafka
接下来,停止 ZooKeeper 服务:
sudo systemctl stop zookeeper
以您的 kafka 用户身份重新登录:
sudo -iu kafka
然后,您可以使用以下命令安全地删除现有集群数据目录:
rm -r /tmp/zookeeper/*
现在恢复您在第 2 步中备份的数据:
tar -C /tmp/zookeeper -xzf /home/kafka/zookeeper-backup.tar.gz --strip-components 2
-C
标志告诉 tar
在提取数据之前更改到目录 /tmp/zookeeper
。 您指定 --strip 2
标志以使 tar
在 /tmp/zookeeper/
本身中提取存档的内容,而不是在其中的另一个目录(例如 /tmp/zookeeper/tmp/zookeeper/
)中。
您已成功恢复集群状态数据。 现在,您可以继续下一节中的 Kafka 数据恢复过程。
第 5 步 — 恢复 Kafka 数据
在本节中,您将通过删除 Kafka 数据目录并恢复压缩存档文件,将备份的 Kafka 数据恢复到现有源安装(或目标服务器,如果您已执行可选步骤 7)。 这将允许您验证恢复是否成功。
您可以使用以下命令安全地删除现有的 Kafka 数据目录:
rm -r /tmp/kafka-logs/*
现在您已经删除了数据,您的 Kafka 安装类似于全新安装,其中没有主题或消息。 要恢复备份的数据,请运行以下命令提取文件:
tar -C /tmp/kafka-logs -xzf /home/kafka/kafka-backup.tar.gz --strip-components 2
-C
标志告诉 tar
在提取数据之前更改到目录 /tmp/kafka-logs
。 您指定 --strip 2
标志以确保存档的内容提取到 /tmp/kafka-logs/
本身,而不是在其中的另一个目录(例如 /tmp/kafka-logs/kafka-logs/
)中。
现在您已经成功提取数据,您可以通过键入 exit
再次启动 Kafka 和 ZooKeeper 服务,切换到您的非 root sudo 用户,然后执行:
sudo systemctl start kafka
使用以下命令启动 ZooKeeper 服务:
sudo systemctl start zookeeper
以您的 kafka 用户身份重新登录:
sudo -iu kafka
您已经恢复了 kafka
数据,您可以继续在下一节中验证恢复是否成功。
第 6 步 — 验证恢复
为了测试 Kafka 数据的恢复,您将使用您在步骤 1 中创建的主题中的消息。
等待几分钟让Kafka启动,然后执行以下命令从BackupTopic
读取消息:
~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic BackupTopic --from-beginning
如果收到如下警告,则需要等待 Kafka 完全启动:
Output[2018-09-13 15:52:45,234] WARN [Consumer clientId=consumer-1, groupId=console-consumer-87747] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
再过几分钟重试上一个命令,或以非 root sudo 用户身份运行 sudo systemctl restart kafka
。 如果恢复没有问题,您将看到以下输出:
OutputTest Message 1
如果您没有看到此消息,您可以检查是否遗漏了上一节中的任何命令并执行它们。
现在您已经验证了恢复的 Kafka 数据,这意味着您已经在单个 Kafka 安装中成功备份和恢复了数据。 您可以继续执行第 7 步,了解如何将集群和主题数据迁移到另一台服务器上的安装。
第 7 步 — 将备份迁移和恢复到另一个 Kafka 服务器(可选)
在本节中,您会将备份的数据从源 Kafka 服务器迁移到目标 Kafka 服务器。 为此,您将首先使用 scp
命令将压缩的 tar.gz
文件下载到本地系统。 然后,您将再次使用 scp
将文件推送到目标服务器。 一旦文件出现在目标服务器中,您就可以按照之前使用的步骤来恢复备份并验证迁移是否成功。
您正在本地下载备份文件,然后将它们上传到目标服务器,而不是将其直接从源复制到目标服务器,因为目标服务器的 /home/sammy/.ssh/authorized_keys
文件中没有源服务器的 SSH 密钥,并且无法连接到源服务器和从源服务器连接。 但是,您的本地计算机可以连接到两台服务器,从而为您节省了设置从源服务器到目标服务器的 SSH 访问的额外步骤。
通过执行以下命令将 zookeeper-backup.tar.gz
和 kafka-backup.tar.gz
文件下载到本地计算机:
scp sammy@source_server_ip:/home/kafka/zookeeper-backup.tar.gz .
您将看到类似于以下内容的输出:
Outputzookeeper-backup.tar.gz 100% 68KB 128.0KB/s 00:00
现在运行以下命令将 kafka-backup.tar.gz
文件下载到本地计算机:
scp sammy@source_server_ip:/home/kafka/kafka-backup.tar.gz .
您将看到以下输出:
Outputkafka-backup.tar.gz 100% 1031KB 488.3KB/s 00:02
在本地计算机的当前目录中运行 ls
,您将看到两个文件:
Outputkafka-backup.tar.gz zookeeper.tar.gz
运行以下命令将zookeeper-backup.tar.gz
文件传输到目的服务器的/home/kafka/
:
scp zookeeper-backup.tar.gz sammy@destination_server_ip:/home/sammy/zookeeper-backup.tar.gz
现在运行以下命令将 kafka-backup.tar.gz
文件传输到目标服务器的 /home/kafka/
:
scp kafka-backup.tar.gz sammy@destination_server_ip:/home/sammy/kafka-backup.tar.gz
您已成功将备份文件上传到目标服务器。 由于文件在 /home/sammy/
目录下,并且没有 kafka 用户访问的正确权限,您可以将文件移动到 /home/kafka/
目录并更改它们权限。
通过执行以下命令通过 SSH 连接到目标服务器:
ssh sammy@destination_server_ip
现在通过执行将 zookeeper-backup.tar.gz
移动到 /home/kafka/
:
sudo mv zookeeper-backup.tar.gz /home/sammy/zookeeper-backup.tar.gz
同样,运行以下命令将kafka-backup.tar.gz
复制到/home/kafka/
:
sudo mv kafka-backup.tar.gz /home/kafka/kafka-backup.tar.gz
通过运行以下命令更改备份文件的所有者:
sudo chown kafka /home/kafka/zookeeper-backup.tar.gz /home/kafka/kafka-backup.tar.gz
之前的 mv
和 chown
命令不会显示任何输出。
现在备份文件存在于目标服务器的正确目录中,请按照本教程的步骤 4 到 6 中列出的命令来恢复和验证目标服务器的数据。
结论
在本教程中,您备份、导入和迁移了来自同一安装和不同服务器上的安装的 Kafka 主题和消息。 如果您想了解更多关于 Kafka 中其他有用的管理任务的信息,可以参考 Kafka 官方文档的 操作 部分。
要远程存储zookeeper-backup.tar.gz
和kafka-backup.tar.gz
等备份文件,您可以探索Digital Ocean Spaces。 如果 Kafka 是您服务器上运行的唯一服务,您还可以探索其他备份方法,例如完整实例 备份 。