如何在Ubuntu16.04上使用Transporter将转换后的数据从MongoDB同步到Elasticsearch
介绍
Transporter 是一个开源工具,用于在不同的数据存储之间移动数据。 开发人员经常为跨数据库移动数据、将数据从文件移动到数据库或反之亦然等任务编写一次性脚本,但使用 Transporter 之类的工具有几个优点。
在 Transporter 中,您构建 pipelines,它定义了从 source(读取数据的地方)到 sink(写入数据的地方)的数据流)。 源和接收器可以是 SQL 或 NoSQL 数据库、平面文件或其他资源。 Transporter 使用 适配器 ,这是可插入的扩展,与这些资源进行通信,该项目默认包括 几个适配器 用于流行的数据库。
除了移动数据之外,Transporter 还允许您在数据通过管道时使用 transformer 更改数据。 与适配器一样,默认包含几个变压器。 您还可以编写自己的转换器来自定义数据的修改。
在本教程中,我们将介绍使用 Transporter 的内置适配器和用 JavaScript 编写的自定义转换器将数据从 MongoDB 数据库移动和处理到 Elasticsearch 的示例。
先决条件
要遵循本教程,您将需要:
- 按照这个Ubuntu 16.04初始服务器设置教程设置一台Ubuntu 16.04服务器,包括sudo非root用户和防火墙。
- 按照 this MongoDB on Ubuntu 16.04 tutorial 安装 MongoDB,或现有的 MongoDB 安装。
- 按照 this Elasticsearch on Ubuntu 16.04 tutorial 安装 Elasticsearch,或现有的 Elasticsearch 安装。
传输器管道是用 JavaScript 编写的。 您不需要任何 JavaScript 知识或经验即可学习本教程,但您可以在 这些 JavaScript 教程 中了解更多信息。
第 1 步 — 安装 Transporter
Transporter 为最常见的操作系统提供二进制文件。 Ubuntu 的安装过程包括两个步骤:下载 Linux 二进制文件并使其可执行。
首先,从 Transporter 在 GitHub 上的最新发布页面获取最新版本的链接。 复制以 -linux-amd64
结尾的链接。 本教程使用 v0.5.2,这是撰写本文时的最新版本。
将二进制文件下载到您的主目录中。
cd wget https://github.com/compose/transporter/releases/download/v0.5.2/transporter-0.5.2-linux-amd64
把它移到 /usr/local/bin 或者你喜欢的安装目录。
mv transporter-*-linux-amd64 /usr/local/bin/transporter
然后使其可执行,以便您可以运行它。
chmod +x /usr/local/bin/transporter
您可以通过运行二进制文件来测试 Transporter 是否设置正确。
transporter
您将看到使用帮助输出和版本号:
OutputUSAGE transporter <command> [flags] COMMANDS run run pipeline loaded from a file . . . VERSION 0.5.2
为了使用 Transporter 将数据从 MongoDB 移动到 Elasticsearch,我们需要两件事:我们想要移动的 MongoDB 中的数据和一个告诉 Transporter 如何移动它的管道。 下一步会创建一些示例数据,但如果您已经有一个要移动的 MongoDB 数据库,则可以跳过下一步直接进入第 3 步。
第 2 步 — 将示例数据添加到 MongoDB(可选)
在这一步中,我们将在 MongoDB 中创建一个包含单个集合的示例数据库,并将一些文档添加到该集合中。 然后,在本教程的其余部分,我们将使用 Transporter 管道迁移和转换此示例数据。
首先,连接到您的 MongoDB 数据库。
mongo
这会将您的提示更改为 mongo>
,表明您正在使用 MongoDB shell。
从这里,选择要处理的数据库。 我们称之为 my_application
。
use my_application
在 MongoDB
中,您不需要显式创建数据库或集合。 一旦您开始将数据添加到您按名称选择的数据库中,该数据库将自动创建。
因此,要创建 my_application
数据库,请将两个文档保存到其 users
集合中:一个代表 Sammy Shark,另一个代表 Gilly Glowfish。 这将是我们的测试数据。
db.users.save({"firstName": "Sammy", "lastName": "Shark"}); db.users.save({"firstName": "Gilly", "lastName": "Glowfish"});
添加文档后,您可以查询 users
集合以查看您的记录。
db.users.find().pretty();
输出看起来类似于下面的输出,但 _id
列会有所不同。 MongoDB 自动添加对象 ID 以唯一标识集合中的文档。
output{ "_id" : ObjectId("59299ac7f80b31254a916456"), "firstName" : "Sammy", "lastName" : "Shark" } { "_id" : ObjectId("59299ac7f80b31254a916457"), "firstName" : "Gilly", "lastName" : "Glowfish" }
按 CTRL+C
退出 MongoDB shell。
接下来,让我们创建一个 Transporter 管道,将这些数据从 MongoDB 移动到 Elasticsearch。
第 3 步 - 创建基本管道
Transporter 中的管道默认由名为 pipeline.js
的 JavaScript 文件定义。 内置的 init
命令在给定源和接收器的正确目录中创建一个基本的 配置文件 。
初始化一个 starter pipeline.js
,其中 MongoDB 作为源,Elasticsearch 作为接收器。
transporter init mongodb elasticsearch
您将看到以下输出:
OutputWriting pipeline.js...
您无需为此步骤修改 pipeline.js
,但让我们看看它是如何工作的。
文件看起来是这样的,但是你也可以使用命令cat pipeline.js
,less pipeline.js
查看文件的内容(按q
退出less
),或者用你最喜欢的文本编辑器打开它。
管道.js
var source = mongodb({ "uri": "${MONGODB_URI}" // "timeout": "30s", // "tail": false, // "ssl": false, // "cacerts": ["/path/to/cert.pem"], // "wc": 1, // "fsync": false, // "bulk": false, // "collection_filters": "{}", // "read_preference": "Primary" }) var sink = elasticsearch({ "uri": "${ELASTICSEARCH_URI}" // "timeout": "10s", // defaults to 30s // "aws_access_key": "ABCDEF", // used for signing requests to AWS Elasticsearch service // "aws_access_secret": "ABCDEF" // used for signing requests to AWS Elasticsearch service // "parent_id": "elastic_parent" // defaults to "elastic_parent" parent identifier for Elasticsearch }) t.Source("source", source, "/.*/").Save("sink", sink, "/.*/")
以 var source
和 var sink
开头的行分别为 MongoDB 和 Elasticsearch 适配器定义了 JavaScript 变量。 我们将在此步骤稍后定义这些适配器所需的 MONGODB_URI
和 ELASTICSEARCH_URI
环境变量。
以 //
开头的行是注释。 它们突出显示了您可以为管道设置的一些常见配置选项,但我们没有将它们用于我们在此处创建的基本管道。
最后一行连接源和接收器。 变量 transporter
或 t
让我们可以访问我们的管道。 我们使用 .Source()
和 .Save()
函数 使用之前在文件中定义的 source
和 sink
变量添加源和接收器.
Source()
和 Save()
函数的第三个参数是 namespace.
传递 /.*/
作为最后一个参数意味着我们要从 MongoDB 传输所有数据并且将其保存在 Elasticsearch 中的同一命名空间下。
在运行此管道之前,我们需要为 MongoDB URI 和 Elasticsearch URI 设置 环境变量 。 在我们使用的示例中,两者都使用默认设置在本地托管,但如果您使用现有的 MongoDB 或 Elasticsearch 实例,请确保自定义这些选项。
export MONGODB_URI='mongodb://localhost/my_application' export ELASTICSEARCH_URI='http://localhost:9200/my_application'
现在我们准备好运行管道了。
transporter run pipeline.js
你会看到这样结束的输出:
Output. . . INFO[0001] metrics source records: 2 path=source ts=1522942118483391242 INFO[0001] metrics source/sink records: 2 path="source/sink" ts=1522942118483395960 INFO[0001] exit map[source:mongodb sink:elasticsearch] ts=1522942118483396878
在倒数第二行和倒数第三行中,此输出表明源中存在 2 条记录,并且 2 条记录已移至接收器。
要确认两条记录都已处理,您可以在 Elasticsearch 中查询 my_application
数据库的内容,该数据库现在应该存在。
curl $ELASTICSEARCH_URI/_search?pretty=true
?pretty=true
参数使输出更易于阅读:
Output{ "took" : 5, "timed_out" : false, "_shards" : { "total" : 5, "successful" : 5, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : 2, "max_score" : 1.0, "hits" : [ { "_index" : "my_application", "_type" : "users", "_id" : "5ac63e9c6687d9f638ced4fe", "_score" : 1.0, "_source" : { "firstName" : "Gilly", "lastName" : "Glowfish" } }, { "_index" : "my_application", "_type" : "users", "_id" : "5ac63e986687d9f638ced4fd", "_score" : 1.0, "_source" : { "firstName" : "Sammy", "lastName" : "Shark" } } ] } }
MongoDB 中的数据库和集合类似于 Elasticsearch 中的索引和类型。 考虑到这一点,您应该看到:
_index
字段设置为my_application,
原始 MongoDB 数据库的名称)。_type
字段设置为users,
MongoDB 集合的名称。firstName
和lastName
字段分别填写了“Sammy”“Shark”和“Gilly”“Glowfish”。
这证实了来自 MongoDB 的两条记录都已通过 Transporter 成功处理并加载到 Elasticsearch。 为了构建这个基本管道,我们将添加一个可以转换输入数据的中间处理步骤。
第四步——创建一个变压器
顾名思义,transformers 在将源数据加载到接收器之前对其进行修改。 例如,它们允许您添加新字段、删除字段或更改字段数据。 Transporter 带有一些预定义的转换器以及对自定义转换器的支持。
通常,自定义转换器被编写为 JavaScript 函数并保存在单独的文件中。 要使用它们,请在 pipeline.js
中添加对转换器文件的引用。 Transporter 包括 Otto 和 Goja JavaScript 引擎。 因为 Goja 更新并且通常更快,所以我们将在这里使用它。 唯一的功能区别是语法。
创建一个名为 transform.js
的文件,我们将使用它来编写我们的转换函数。
nano transform.js
这是我们将使用的函数,它将创建一个名为 fullName
的新字段,其值将是 firstName
和 lastName
字段连接在一起,用空格分隔(如 Sammy Shark
)。
转换.js
function transform(msg) { msg.data.fullName = msg.data.firstName + " " + msg.data.lastName; return msg }
让我们看一下这个文件的行:
- 文件的第一行,
function transform(msg),
是函数定义。 msg
是一个 JavaScript 对象,其中包含源文档的详细信息。 我们使用这个对象来访问通过管道的数据。- 函数 的第一行连接 两个现有字段,并且 将该值 分配给新的
fullName
字段。 - 函数的最后一行返回新修改的
msg
对象,以供管道的其余部分使用。
保存并关闭文件。
接下来,我们需要修改管道以使用这个转换器。 打开 pipeline.js
文件进行编辑。
nano pipeline.js
在最后一行,我们需要添加对 Transform()
函数的调用,以将转换器添加到对 Source()
和 Save()
的调用之间的管道中,如下所示:
~/transporter/pipeline.js
. . . t.Source("source", source, "/.*/") .Transform(goja({"filename": "transform.js"})) .Save("sink", sink, "/.*/")
传递给 Transform()
的参数是转换的类型,在本例中为 Goja。 使用 goja
函数,我们使用其 相对路径 指定转换器的文件名。
保存并关闭文件。 在我们重新运行管道测试转换器之前,让我们从之前的测试中清除 Elasticsearch 中的现有数据。
curl -XDELETE $ELASTICSEARCH_URI
您将看到此输出确认命令成功。
Output{"acknowledged":true}
现在重新运行管道。
transporter run pipeline.js
输出看起来与之前的测试非常相似,您可以在最后几行中看到管道是否像以前一样成功完成。 可以肯定的是,我们可以再次检查 Elasticsearch 以查看数据是否以我们期望的格式存在。
curl $ELASTICSEARCH_URI/_search?pretty=true
您可以在新输出中看到 fullName
字段:
Output{ "took" : 9, "timed_out" : false, "_shards" : { "total" : 5, "successful" : 5, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : 2, "max_score" : 1.0, "hits" : [ { "_index" : "my_application", "_type" : "users", "_id" : "5ac63e9c6687d9f638ced4fe", "_score" : 1.0, "_source" : { "firstName" : "Gilly", "fullName" : "Gilly Glowfish", "lastName" : "Glowfish" } }, { "_index" : "my_application", "_type" : "users", "_id" : "5ac63e986687d9f638ced4fd", "_score" : 1.0, "_source" : { "firstName" : "Sammy", "fullName" : "Sammy Shark", "lastName" : "Shark" } } ] } }
请注意,两个文档中都添加了 fullName
字段,并正确设置了值。 有了这个,现在我们知道如何将自定义转换添加到 Transporter 管道。
结论
您已经使用转换器构建了一个基本的 Transporter 管道,用于将数据从 MongoDB 复制和修改到 Elasticsearch。 您可以以相同的方式应用更复杂的转换,在同一管道中链接多个转换,等等。 MongoDB 和 Elasticsearch 只是 Transporter 支持的两个适配器。 它还支持平面文件、Postgres 等 SQL 数据库以及许多其他数据源。
您可以查看 GitHub 上的 Transporter 项目以随时了解 API 的最新更改,并访问 Transporter wiki 了解有关如何使用适配器、变压器和变压器的其他功能。