dump elasticsearch 2.x to mongodb and back to ES 6.x.

这个问题更多的是理论而不是源代码。

我有一个ES 2.x节点,其数据超过1.2TB。 我们有40多个索引,每个索引至少有一种类型。 这里,ES 2.x用作数据库而不是搜索引擎。 用于将数据转储到ES 2.x的源丢失。 此外,数据未规范化,但单个ES文档具有多个嵌入文档。 我们的目标是重新创建数据源,同时将其标准化。

我们计划的是:

  1. 从ES中检索数据,对其进行分析并将其转储到特定集合的新mongodb中,并维护数据之间的关系。 即。 以标准化forms保存。
  2. 在新的ES 6节点上索引新的mongo数据。

我们使用的是JRuby 9.1.15.0,Rails 5,Ruby 2.4和Sidekiq。

目前,我们正在从ES检索特定日期时间范围的数据。 有时我们收到0条记录,有时甚至是100000条。 问题是当我们收到大量记录时。

下面是一个示例脚本,当日期范围的数据很小但数据很大时失败时,它会起作用。 1.2TB / 40指数是平均指数的大小

class DataRetrieverWorker include Sidekiq::Worker include Sidekiq::Status::Worker def perform(indx_name, interval = 24, start_time = nil, end_time = nil) unless start_time || end_time client = ElasticSearchClient.instance.client last_retrieved_at = RetrievedIndex.where(name: indx_name).desc(:created_at).first start_time, end_time = unless last_retrieved_at data = client.search index: indx_name, size: 1, sort: [{ insert_time: { order: 'asc' } }] first_day = DateTime.parse(data['hits']['hits'].first['_source']['insert_time']) start_time = first_day.beginning_of_day end_time = first_day.end_of_day else # retrieve for the next time slot. usually 24 hrs. [last_retrieved_at.end_time, last_retrieved_at.end_time + interval.hours] end DataRetrieverWorker.perform_async(indx_name, interval, start_time, end_time) else # start scroll on the specified range and retrieve data. query = { range: { insert_time: { gt: DateTime.parse(start_time).utc.iso8601, lt: DateTime.parse(end_time).utc.iso8601 } } } data = client.search index: indx_name, scroll: '10m', size: SCROLL_SIZE, body: { query: query } ri = RetrievedIndex.find_by(name: indx_name, start_time: start_time, end_time: end_time) if ri DataRetrieverWorker.perform_at(2.seconds.from_now, indx_name, interval) return end ri = RetrievedIndex.create!(name: indx_name, start_time: start_time, end_time: end_time, documents_cnt: data['hits']['total']) if data['hits']['total'] > 0 if data['hits']['total'] > 2000 BulkJobsHandlerWorker.perform_async(ri.id.to_s, data['hits']['hits']) while data = client.scroll(body: { scroll_id: data['_scroll_id'] }, scroll: '10m') and not data['hits']['hits'].empty? do BulkJobsHandlerWorker.perform_async(ri.id.to_s, data['hits']['hits']) end else data['hits']['hits'].each do |r| schedule(r) ri.retrieved_documents.find_or_create_by!(es_id: r['_id'], es_index: indx_name) end while data = client.scroll(body: { scroll_id: data['_scroll_id'] }, scroll: '10m') and not data['hits']['hits'].empty? do data['hits']['hits'].each do |r| schedule(r) ri.retrieved_documents.find_or_create_by!(es_id: r['_id'], es_index: indx_name) end end end else DataRetrieverWorker.perform_async(indx_name, interval) return end DataRetrieverWorker.perform_at(indx_name, interval) end end private def schedule(data) DataPersisterWorker.perform_async(data) end end 

问题:

  1. 什么应该是从ES 2.x中检索数据的理想方法。 我们通过日期范围检索数据,然后使用scroll api检索结果集。 这是正确的吗?
  2. 当我们在特定时间范围内获得大结果时应该怎么做。 有时,我们会在几分钟的时间范围内获得20000多条记录。 什么应该是理想的方法?
  3. sidekiq是适合这种数据处理的正确库吗?
  4. 什么应该是运行sidekiq的服务器的理想配置?
  5. 使用日期范围正确的方法来检索数据? 文件数量差异很大。 0或100000+。
  6. 有没有更好的方法可以让我获得无论时间范围多少的记录?
  7. 我尝试使用独立于时间范围的滚动api但是对于具有100cr记录的索引,是否使用大小为100的滚动(对于ES的api调用100个结果)? 8.指数中的数据不断增加。 没有任何文件更新。

我们测试了我们的代码,它每个日期时间范围(比如6小时)处理名义数据(比如说4-5k文档)。 我们还计划对数据进行分片。 由于我们需要在某些集合中添加/更新记录时执行一些ruby回调,因此我们将使用Mongoid。 在没有mongoid的mongodb中直接插入数据不是一种选择。

任何指针都会有所帮助。 谢谢。

在我看来,你应该假设这个过程在任何阶段都可能失败。

恕我直言,你不应该下载所有文件,而只是下载匹配日期范围文件的IDS。 这将显着减少ElasticSearch返回的数据量

使用这些IDS,您可以在后台执行另一个工作人员(让我们称之为ImporterWorker ),IDS作为输入,从ElasticSearch下载整个文档并将其导出到MongoDB。

此外,如果你让我们说1_000_000 IDS,你可以将它们分成N个较小的块(200 X 5_000)并排队N个作业。

优点:

  • 拆分成块 – 您没有从ElasticSearch获得高容量响应的风险,因为块大小决定了ElasticSearch响应的最大大小

  • 当某些东西出现故障(临时网络问题或任何其他问题)时,您将使用最初触发它的IDS重新运行ImporterWorker ,一切都可以正常工作而不会中断整个过程。 即使它失败了 – 你也会知道没有导入的确切IDS

  1. 什么应该是从ES 2.x中检索数据的理想方法。 我们通过日期范围检索数据,然后使用scroll api检索结果集。 这是正确的吗?

ES中的数据是否在不断增加?

  1. 当我们在特定时间范围内获得大结果时应该怎么做。 有时,我们会在几分钟的时间范围内获得20000多条记录。 什么应该是理想的方法?

你正在使用滚动API,这是一个很好的方法。 您可以尝试使用ES的Sliced Scroll API。

  1. sidekiq是适合这种数据处理的正确库吗?

是的sidekiq很好,可以处理这么多的数据。

  1. 什么应该是运行sidekiq的服务器的理想配置?

运行sidekiq的服务器的当前配置是什么?

  1. 使用日期范围正确的方法来检索数据? 文件数量差异很大。 0或100000+。

您一次不能持有100000多个结果。 您正在使用滚动API处理它们。 如果未在ES中继续添加数据,则使用带有scroll api的match_all: {}查询。 如果连续添加数据,则日期范围是很好的方法。

  1. 有没有更好的方法可以给我统一的记录数量而不管时间范围?

是的,如果您使用不使用日期范围。 使用滚动API扫描从0到最后的所有文档。

  1. 我尝试使用独立于时间范围的滚动api但是对于具有100cr记录的索引,是否使用大小为100的滚动(对于ES的api调用100个结果)?

您可以增加滚动大小,因为mongodb支持批量插入文档。 MongoDB批量插入

以下几点可以解决您的问题:

处理上一批处理后清除scroll_id可以提高性能。

  1. 当排序顺序为_doc时,滚动请求具有使其更快的优化。 如果要迭代所有文档而不管顺序,这是最有效的选项。

  2. scroll参数告诉Elasticsearch它应该保持搜索上下文的活动时间。 它的值(例如1m)不需要足够长以处理所有数据,它只需要足够长以处理前一批结果。 每个滚动请求都设置一个新的到期时间。

  3. 超过滚动超时时,将自动删除搜索上下文。 但是保持滚动打开会产生成本(稍后在性能部分讨论),因此只要使用clear-scroll API不再使用滚动,就应该显式清除滚动:

  4. Scroll API:后台合并过程通过将较小的段合并在一起来创建新的更大的段来优化索引,此时删除较小的段。 此过程在滚动期间继续,但打开的搜索上下文可防止旧片段在仍在使用时被删除。 这就是Elasticsearch能够返回初始搜索请求的结果,无论后续对文档的更改如何。 保持旧段保持活动意味着需要更多文件句柄。 确保已将节点配置为具有充足的空闲文件句柄,并且在获取数据后立即清除滚动API上下文。 我们可以使用nodes stats API检查打开了多少个搜索上下文:

因此,非常有必要清除Scroll API Context,如前面Clear Scroll API部分所述。

资源

Elasticdump是一个非常方便的工具,可以根据Elasticsearch查询备份/恢复或重新索引数据。

要备份完整索引,Elasticsearch快照API是正确的工具。 快照API提供了创建和恢复整个索引,存储在文件或Amazon S3存储桶中的快照的操作。

我们来看一下Elasticdump和快照备份和恢复的一些示例。

  1. 使用节点包管理器安装elasticdump

     npm i elasticdump -g 
  2. 通过查询备份到zip文件:

     elasticdump --input='http://username:password@localhost:9200/myindex' --searchBody '{"query" : {"range" :{"timestamp" : {"lte": 1483228800000}}}}' --output=$ --limit=1000 | gzip > /backups/myindex.gz 
  3. 从zip文件恢复:

     zcat /backups/myindex.gz | elasticdump --input=$ --output=http://username:password@localhost:9200/index_name 

具有到Amazon S3或文件的快照的备份和还原数据的示例

首先配置快照目标

  1. S3例子

     curl 'localhost:9200/_snapshot/my_repository?pretty' -XPUT -H 'Content-Type: application/json' -d '{ "type" : "s3", "settings" : { "bucket" : "test-bucket", "base_path" : "backup-2017-01", "max_restore_bytes_per_sec" : "1gb", "max_snapshot_bytes_per_sec" : "1gb", "compress" : "true", "access_key" : "", "secret_key" : "" } }' 
  2. 本地磁盘或已挂载的NFS示例

     curl 'localhost:9200/_snapshot/my_repository?pretty' -XPUT -H 'Content-Type: application/json' -d '{ "type" : "fs", "settings" : { "location": "" } }' 
  3. 触发快照

     curl -XPUT 'localhost:9200/_snapshot/my_repository/' 
  4. 显示所有备份

     curl 'localhost:9200/_snapshot/my_repository/_all' 
  5. 还原 – 备份中最重要的部分是validation备份还原是否真正有效!

     curl -XPOST 'localhost:9200/_snapshot/my_repository//_restore' 

本文发现于:
https://sematext.com/blog/elasticsearch-security-authentication-encryption-backup/

Interesting Posts