悲观锁(Pessimistic Lock), 顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会block直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。

乐观锁(Optimistic Lock), 顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量,像数据库如果提供类似于 write_condition 机制的其实都是提供的乐观锁。

乐观锁适用于写比较少的情况下,即冲突真的很少发生的时候,这样可以省去了锁的开销,加大了系统的整个吞吐量。但如果经常产生冲突,上层应用会不断的进行 retry,这样反倒是降低了性能,所以这种情况下用悲观锁就比较合适。

Elasticsearch 大部分场景下都是一个读多写少的系统,如果按照悲观锁的策略,会大大降低 Elasticsearch 的吞吐。

对于 Elasticsearch 并发写入,最好的方式是在设计上就排除并发问题,但有时不可避免会发生并发写入,
先看一下什么是并发插入的问题,多个进程同时得到一个用户的数据,然后同时插入 Elasticsearch ,如果不加锁,后到的数据是会覆盖掉前面的数据,实际我们想要的是,如果存在并发插入,那么第二条数据应该是以更新的方式添加的,而不是覆盖。

那如何实现呢?

在插入时,使用 es 提供的 IndexRequest.create(true) 方法,标记同一个时刻插入的数据,只会有一条数据插入成功,插入失败的会抛出文档已经存在的异常,那么应用程序端捕捉异常在代码里控制重试插入。重试时候会判断该条数据是否已经存在,如果存在就更新。

更新时候遇到并发问题处理,主要有2种思路:

  1. 如果是针对某个数值做累加或者减,可以使用es服务端冲突重试机制解决,这个方式比较简单,不需要 我们在程序中处理并发逻辑,我们所需要做的就是评估同一条数据的并发程度,然后设置合理重试次数就行,在重试之后如果仍然失败就会抛出异常,然后我们针对做处理。

  2. 可以通过 es 内部维护的 version 字段来自定义实现灵活控制的乐观锁。

当第一次插入一条数据成功时,es返回的reponse里面会给出当前这条数据的_version=1,如果我们更新这条数据前,读取这条数据当前的version=1,然后在更新时候只有携带的version=1时才能更新成功,如果更新成功version会加1,同一时刻当有两个进程都携带version=1去更新数据,最终只会有一条数据更新成功,只要更新成功version会累加=2,然后其他进程会更新失败,报版本冲突,因为最新是2,其他的都是1,所以更新失败,会抛出冲突异常:

{
   "error": {
      "root_cause": [
         {
            "type": "version_conflict_engine_exception",
            "reason": "[blog][1]: version conflict, current [2], provided [1]",
            "index": "website",
            "shard": "3"
         }
      ],
      "type": "version_conflict_engine_exception",
      "reason": "[blog][1]: version conflict, current [2], provided [1]",
      "index": "website",
      "shard": "3"
   },
   "status": 409
}

内部维护的version可以在更新和删除的api时使用。

下面我们看一下使用外部version来控制乐观锁,上面的version每次更新成功的+1操作都是es内部维护的,除此之外我们还可以使用外部自定义维护的版本进行插入,删除,更新操作:

PUT /website/blog/2?version=5&version_type=external
{
  "title": "My first external blog entry",
  "text":  "Starting to get the hang of this..."
}

结果

{
  "_index":   "website",
  "_type":    "blog",
  "_id":      "2",
  "_version": 5,
  "created":  true
}

现在我们指定version=10去更新后,返回的新响应如下:

PUT /website/blog/2?version=10&version_type=external
{
  "title": "My first external blog entry",
  "text":  "This is a piece of cake..."
}
//===================
 
{
  "_index":   "website",
  "_type":    "blog",
  "_id":      "2",
  "_version": 10,
  "created":  false
}

如果再次执行上面的那个请求就会失败,因为新版本必须大于已经存在的版本号。

利用这个特性,我们也可以将时间戳当做版本,传进去,能保证当前的数据只有是最新的数据才能插入更新。