如何在Ubuntu16.04上使用Transporter将转换后的数据从MongoDB同步到Elasticsearch

来自菜鸟教程
跳转至:导航、​搜索

介绍

Transporter 是一个开源工具,用于在不同的数据存储之间移动数据。 开发人员经常为跨数据库移动数据、将数据从文件移动到数据库或反之亦然等任务编写一次性脚本,但使用 Transporter 之类的工具有几个优点。

在 Transporter 中,您构建 pipelines,它定义了从 source(读取数据的地方)到 sink(写入数据的地方)的数据流)。 源和接收器可以是 SQL 或 NoSQL 数据库、平面文件或其他资源。 Transporter 使用 适配器 ,这是可插入的扩展,与这些资源进行通信,该项目默认包括 几个适配器 用于流行的数据库。

除了移动数据之外,Transporter 还允许您在数据通过管道时使用 transformer 更改数据。 与适配器一样,默认包含几个变压器。 您还可以编写自己的转换器来自定义数据的修改。

在本教程中,我们将介绍使用 Transporter 的内置适配器和用 JavaScript 编写的自定义转换器将数据从 MongoDB 数据库移动和处理到 Elasticsearch 的示例。

先决条件

要遵循本教程,您将需要:

传输器管道是用 JavaScript 编写的。 您不需要任何 JavaScript 知识或经验即可学习本教程,但您可以在 这些 JavaScript 教程 中了解更多信息。

第 1 步 — 安装 Transporter

Transporter 为最常见的操作系统提供二进制文件。 Ubuntu 的安装过程包括两个步骤:下载 Linux 二进制文件并使其可执行。

首先,从 Transporter 在 GitHub 上的最新发布页面获取最新版本的链接。 复制以 -linux-amd64 结尾的链接。 本教程使用 v0.5.2,这是撰写本文时的最新版本。

将二进制文件下载到您的主目录中。

cd
wget https://github.com/compose/transporter/releases/download/v0.5.2/transporter-0.5.2-linux-amd64

把它移到 /usr/local/bin 或者你喜欢的安装目录。

mv transporter-*-linux-amd64 /usr/local/bin/transporter

然后使其可执行,以便您可以运行它。

chmod +x /usr/local/bin/transporter

您可以通过运行二进制文件来测试 Transporter 是否设置正确。

transporter

您将看到使用帮助输出和版本号:

OutputUSAGE
  transporter <command> [flags]

COMMANDS
  run       run pipeline loaded from a file
  . . .

VERSION
  0.5.2

为了使用 Transporter 将数据从 MongoDB 移动到 Elasticsearch,我们需要两件事:我们想要移动的 MongoDB 中的数据和一个告诉 Transporter 如何移动它的管道。 下一步会创建一些示例数据,但如果您已经有一个要移动的 MongoDB 数据库,则可以跳过下一步直接进入第 3 步。

第 2 步 — 将示例数据添加到 MongoDB(可选)

在这一步中,我们将在 MongoDB 中创建一个包含单个集合的示例数据库,并将一些文档添加到该集合中。 然后,在本教程的其余部分,我们将使用 Transporter 管道迁移和转换此示例数据。

首先,连接到您的 MongoDB 数据库。

mongo

这会将您的提示更改为 mongo>,表明您正在使用 MongoDB shell。

从这里,选择要处理的数据库。 我们称之为 my_application

use my_application

MongoDB 中,您不需要显式创建数据库或集合。 一旦您开始将数据添加到您按名称选择的数据库中,该数据库将自动创建。

因此,要创建 my_application 数据库,请将两个文档保存到其 users 集合中:一个代表 Sammy Shark,另一个代表 Gilly Glowfish。 这将是我们的测试数据。

db.users.save({"firstName": "Sammy", "lastName": "Shark"});
db.users.save({"firstName": "Gilly", "lastName": "Glowfish"});

添加文档后,您可以查询 users 集合以查看您的记录。

db.users.find().pretty();

输出看起来类似于下面的输出,但 _id 列会有所不同。 MongoDB 自动添加对象 ID 以唯一标识集合中的文档。

output{
  "_id" : ObjectId("59299ac7f80b31254a916456"),
  "firstName" : "Sammy",
  "lastName" : "Shark"
}
{
  "_id" : ObjectId("59299ac7f80b31254a916457"),
  "firstName" : "Gilly",
  "lastName" : "Glowfish"
}

CTRL+C 退出 MongoDB shell。

接下来,让我们创建一个 Transporter 管道,将这些数据从 MongoDB 移动到 Elasticsearch。

第 3 步 - 创建基本管道

Transporter 中的管道默认由名为 pipeline.js 的 JavaScript 文件定义。 内置的 init 命令在给定源和接收器的正确目录中创建一个基本的 配置文件

初始化一个 starter pipeline.js,其中 MongoDB 作为源,Elasticsearch 作为接收器。

transporter init mongodb elasticsearch

您将看到以下输出:

OutputWriting pipeline.js...

您无需为此步骤修改 pipeline.js,但让我们看看它是如何工作的。

文件看起来是这样的,但是你也可以使用命令cat pipeline.jsless pipeline.js查看文件的内容(按q退出less),或者用你最喜欢的文本编辑器打开它。

管道.js

var source = mongodb({
  "uri": "${MONGODB_URI}"
  // "timeout": "30s",
  // "tail": false,
  // "ssl": false,
  // "cacerts": ["/path/to/cert.pem"],
  // "wc": 1,
  // "fsync": false,
  // "bulk": false,
  // "collection_filters": "{}",
  // "read_preference": "Primary"
})

var sink = elasticsearch({
  "uri": "${ELASTICSEARCH_URI}"
  // "timeout": "10s", // defaults to 30s
  // "aws_access_key": "ABCDEF", // used for signing requests to AWS Elasticsearch service
  // "aws_access_secret": "ABCDEF" // used for signing requests to AWS Elasticsearch service
  // "parent_id": "elastic_parent" // defaults to "elastic_parent" parent identifier for Elasticsearch
})

t.Source("source", source, "/.*/").Save("sink", sink, "/.*/")

var sourcevar sink 开头的行分别为 MongoDB 和 Elasticsearch 适配器定义了 JavaScript 变量。 我们将在此步骤稍后定义这些适配器所需的 MONGODB_URIELASTICSEARCH_URI 环境变量。

// 开头的行是注释。 它们突出显示了您可以为管道设置的一些常见配置选项,但我们没有将它们用于我们在此处创建的基本管道。

最后一行连接源和接收器。 变量 transportert 让我们可以访问我们的管道。 我们使用 .Source().Save() 函数 使用之前在文件中定义的 sourcesink 变量添加源和接收器.

Source()Save() 函数的第三个参数是 namespace. 传递 /.*/ 作为最后一个参数意味着我们要从 MongoDB 传输所有数据并且将其保存在 Elasticsearch 中的同一命名空间下。

在运行此管道之前,我们需要为 MongoDB URIElasticsearch URI 设置 环境变量 。 在我们使用的示例中,两者都使用默认设置在本地托管,但如果您使用现有的 MongoDB 或 Elasticsearch 实例,请确保自定义这些选项。

export MONGODB_URI='mongodb://localhost/my_application'
export ELASTICSEARCH_URI='http://localhost:9200/my_application'

现在我们准备好运行管道了。

transporter run pipeline.js

你会看到这样结束的输出:

Output. . .
INFO[0001] metrics source records: 2                     path=source ts=1522942118483391242
INFO[0001] metrics source/sink records: 2                path="source/sink" ts=1522942118483395960
INFO[0001] exit map[source:mongodb sink:elasticsearch]   ts=1522942118483396878

在倒数第二行和倒数第三行中,此输出表明源中存在 2 条记录,并且 2 条记录已移至接收器。

要确认两条记录都已处理,您可以在 Elasticsearch 中查询 my_application 数据库的内容,该数据库现在应该存在。

curl $ELASTICSEARCH_URI/_search?pretty=true

?pretty=true 参数使输出更易于阅读:

Output{
  "took" : 5,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 2,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "my_application",
        "_type" : "users",
        "_id" : "5ac63e9c6687d9f638ced4fe",
        "_score" : 1.0,
        "_source" : {
          "firstName" : "Gilly",
          "lastName" : "Glowfish"
        }
      },
      {
        "_index" : "my_application",
        "_type" : "users",
        "_id" : "5ac63e986687d9f638ced4fd",
        "_score" : 1.0,
        "_source" : {
          "firstName" : "Sammy",
          "lastName" : "Shark"
        }
      }
    ]
  }
}

MongoDB 中的数据库和集合类似于 Elasticsearch 中的索引和类型。 考虑到这一点,您应该看到:

  • _index 字段设置为 my_application, 原始 MongoDB 数据库的名称)。
  • _type 字段设置为 users, MongoDB 集合的名称。
  • firstNamelastName 字段分别填写了“Sammy”“Shark”和“Gilly”“Glowfish”。

这证实了来自 MongoDB 的两条记录都已通过 Transporter 成功处理并加载到 Elasticsearch。 为了构建这个基本管道,我们将添加一个可以转换输入数据的中间处理步骤。

第四步——创建一个变压器

顾名思义,transformers 在将源数据加载到接收器之前对其进行修改。 例如,它们允许您添加新字段、删除字段或更改字段数据。 Transporter 带有一些预定义的转换器以及对自定义转换器的支持。

通常,自定义转换器被编写为 JavaScript 函数并保存在单独的文件中。 要使用它们,请在 pipeline.js 中添加对转换器文件的引用。 Transporter 包括 Otto 和 Goja JavaScript 引擎。 因为 Goja 更新并且通常更快,所以我们将在这里使用它。 唯一的功能区别是语法。

创建一个名为 transform.js 的文件,我们将使用它来编写我们的转换函数。

nano transform.js

这是我们将使用的函数,它将创建一个名为 fullName 的新字段,其值将是 firstNamelastName 字段连接在一起,用空格分隔(如 Sammy Shark)。

转换.js

function transform(msg) {
    msg.data.fullName = msg.data.firstName + " " + msg.data.lastName;
    return msg
}

让我们看一下这个文件的行:

保存并关闭文件。

接下来,我们需要修改管道以使用这个转换器。 打开 pipeline.js 文件进行编辑。

nano pipeline.js

在最后一行,我们需要添加对 Transform() 函数的调用,以将转换器添加到对 Source()Save() 的调用之间的管道中,如下所示:

~/transporter/pipeline.js

. . .
t.Source("source", source, "/.*/")
.Transform(goja({"filename": "transform.js"}))
.Save("sink", sink, "/.*/")

传递给 Transform() 的参数是转换的类型,在本例中为 Goja。 使用 goja 函数,我们使用其 相对路径 指定转换器的文件名。

保存并关闭文件。 在我们重新运行管道测试转换器之前,让我们从之前的测试中清除 Elasticsearch 中的现有数据。

curl -XDELETE $ELASTICSEARCH_URI

您将看到此输出确认命令成功。

Output{"acknowledged":true}

现在重新运行管道。

transporter run pipeline.js

输出看起来与之前的测试非常相似,您可以在最后几行中看到管道是否像以前一样成功完成。 可以肯定的是,我们可以再次检查 Elasticsearch 以查看数据是否以我们期望的格式存在。

curl $ELASTICSEARCH_URI/_search?pretty=true

您可以在新输出中看到 fullName 字段:

Output{
  "took" : 9,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 2,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "my_application",
        "_type" : "users",
        "_id" : "5ac63e9c6687d9f638ced4fe",
        "_score" : 1.0,
        "_source" : {
          "firstName" : "Gilly",
          "fullName" : "Gilly Glowfish",
          "lastName" : "Glowfish"
        }
      },
      {
        "_index" : "my_application",
        "_type" : "users",
        "_id" : "5ac63e986687d9f638ced4fd",
        "_score" : 1.0,
        "_source" : {
          "firstName" : "Sammy",
          "fullName" : "Sammy Shark",
          "lastName" : "Shark"
        }
      }
    ]
  }
}

请注意,两个文档中都添加了 fullName 字段,并正确设置了值。 有了这个,现在我们知道如何将自定义转换添加到 Transporter 管道。

结论

您已经使用转换器构建了一个基本的 Transporter 管道,用于将数据从 MongoDB 复制和修改到 Elasticsearch。 您可以以相同的方式应用更复杂的转换,在同一管道中链接多个转换,等等。 MongoDB 和 Elasticsearch 只是 Transporter 支持的两个适配器。 它还支持平面文件、Postgres 等 SQL 数据库以及许多其他数据源。

您可以查看 GitHub 上的 Transporter 项目以随时了解 API 的最新更改,并访问 Transporter wiki 了解有关如何使用适配器、变压器和变压器的其他功能。