ES大数据量同步,如何又快又稳?

ES大数据量同步,如何又快又稳?

Scroll Down

背景

需求要根据商家名称、经纬度对商家列表进行地理位置检索和全文检索,目前这部分数据是缓存在 MongoDB 上,虽然说 MongoDB 支持全文索引,但是对于中文检索,MongoDB 似乎做的还不是非常好,线上也会经常出现查询超时的情况。所以经过考虑研究后,决定改用 Elasticsearch 作为搜索引擎。

目前线上的百万数据需要同步到 Elasticsearch,那么如何能够安全又快速的实现大数据量的同步呢?

方案验证

测试数据量: 200w

方案一:单线程同步

线程数耗时
11h

单线程跑对于应用来说是非常安全的,但是在时间效率上,确实不太能容忍。

方案二:多线程

把同步数据分成多份,使用多线程同步

与上面单线程相比,这个速率可以说是一个质的飞跃。

线程数耗时
45 min
54 min
82.5 min
161.6 min

线程类型分为:CPU 密集型IO 密集型

很显然这是属于 IO 类型,瓶颈应该是在 MySQL 和 ES 的 IO 上,所以理论上线程数越多处理速度越快,这也跟上面的数据符合一致。

当然也不是线程数越多越好,CPU 核数就那么几个,线程数多了上下文切换的耗时也会增加,应用的其他业务也会被影响到。所以在全局的考虑下,选择最合适的 8 个线程数。

因为要等待所有的线程数完成任务,才算是完成整个同步的任务。这里使用了 CountDownLatch 作为计数器:

countDownLatch = new CountDownLatch(8);
taskExecutor.submit(() -> {
          try {
            // 业务逻辑
            doSomething();
          } finally {
            countDownLatch.countDown();
          }
});
countDownLatch.await();
log.info("task done!");

当然,在这个过程中还遇到 RestHighLevelClient 抛出的一个异常:

Caused by: java.util.concurrent.TimeoutException: Connection lease request time out

RestHighLevelClient 的底层其实是 HttpClient ,如果有使用过 HttpClient 的同学,看到这个异常应该也不陌生。

这里有三个比较重要的参数:

  • connectionRequestTimout 从连接池获取连接的timeout
  • connetionTimeout 客户端和服务器建立连接的timeout
  • socketTimeout 客户端和服务器建立连接后,客户端从服务器读取数据的timeout

很显然 connectionRequestTimout 配置的超时时间太短了,后面边改为 10 s 就解决了这个问题。

方案三:多线程+多线程

要知道整个同步过程其实就是 :

  1. 读取 MySQL 数据
  2. 写入 ES

肥壕打印了这两个步骤的耗时,发现步骤二的占用时间是70%-80%,那有办法能够提高步骤二的速度吗?

肥壕这时有个大胆的想法,如果把步骤二也分成多个线程呢?

这里使用 Semaphore 来控制 ES写入的并发线程数:

doSomething() {
  Semaphore semaphore = new Semaphore(2);
  try {
		semaphore.acquire();
    taskExecutor.submit(() -> {
      // ES写入
      write2Es()
      semaphore.release();
    });
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
}
ES 写入线程数耗时
21.5 min
41.3 min

(8 个任务线程数)上面的数据可以看到,ES 写入线程数为 2 和 4 的时候区别不大,应该是 ES 线程数为 2 时整体耗时应该跟 MySQL 的读取耗时差不多。

总结与思考

多线程的方案思想和 MapReduce 很像,就是把一个大任务分割成若干小任务,最终汇总每个小任务的结果得到这个大任务的结果。这个思想与第一个单线程的方案相比,速度上肯定是无法比拟的。

这里还有一个问题,就是多个线程要保证完全高效利用:

  • 比如,在上面 MySQL 数据的拆分时,肥壕是按照自增 id 来划分,所以理论上每个任务的任务量是一致的。

    但是在测试库中,会有数据删除之类的操作,会出现划分不均匀的情况,所以这时每个线程分配的数据量就可能不太一样,导致个别线程会耗时比较长。

  • 有的线程优先完成了任务,那能否帮助其他线程分摊一下任务?

了解过一下 Fork/Join 框架,貌似这个好像能解决上述的问题,等肥壕研究一番后再做讨论吧。

当然,如果有更好的提议和方案,非常欢迎多多指教!

普通的改变,将改变普通

我是肥壕,一个在互联网低调前行的小青年

欢迎微信搜一搜「卜几岛」,点击关注,阅读更多分享好文