Future 模式
editFuture 模式
edit客户端提供 future 模式(或叫异步模式)。future 模式允许批量发送请求(并行发送到集群),这对于提高性能和生产力有极大帮助。
PHP 是单线程的脚本语言,然而 libcurl 的 multi interface 功能使得像 PHP 这种单线程的语言可以批量发送请求,从而获得并发性特征。批量请求是通过底层的多线程 libcurl 库并行的发送请求给 Elasticsearch,而返回给PHP的数据也是批量的。
在单线程环境下,执行 n
个请求的时间等于 n
个请求时间相加。在 multi interface 功能下,执行 n
个请求的时间等于最慢的一个请求时间。
除此以外,multi-interface 功能允许批量请求同时发送到不同的主机,这意味着 Elasticsearch-PHP 可以更高效地利用集群。
使用 Future 模式
edit使用这种模式相对简单,只是你要写多一点代码。为了开启 future 模式,在 client 选项中增加 future
参数,并设置值为 'lazy'
:
$client = ClientBuilder::create()->build(); $params = [ 'index' => 'test', 'type' => 'test', 'id' => 1, 'client' => [ 'future' => 'lazy' ] ]; $future = $client->get($params);
这里会返回一个 future 对象,而不是真正的响应数据。future 对象是待处理对象,它看起来就像是个占位符。你可以把 future 对象当成是普通对象在代码中传递使用。当你需要响应数据时,你可以解析 future 对象。如果 future 对象已经被解析,可以立即使用响应数据。如果 future 对象还没被解析完,那么解析动作会阻塞 PHP 脚本的执行,直到解析完成。
在实际应用中,你可以通过设置 future: lazy
键值对构造一个请求队列,而返回的 future 对象直到解析完成,程序才会继续执行。无论什么时候,全部的请求都是以并行方式发送到集群,以异步方式返回给 curl。
这听起来好复杂,但由于RingPHP的 FutureArray
接口,这些操作则变得很简单。它让 future 对象看起来像是一个关联数组。例如:
$client = ClientBuilder::create()->build(); $params = [ 'index' => 'test', 'type' => 'test', 'id' => 1, 'client' => [ 'future' => 'lazy' ] ]; $future = $client->get($params); $doc = $future['_source']; // This call will block and force the future to resolve
就像通常的响应数据那样,future 对象可以用迭代关联数组的方式解析特定的值(轮流解析未解析的请求和值)。这样就可以写成如下形式:
$client = ClientBuilder::create()->build(); $futures = []; for ($i = 0; $i < 1000; $i++) { $params = [ 'index' => 'test', 'type' => 'test', 'id' => $i, 'client' => [ 'future' => 'lazy' ] ]; $futures[] = $client->get($params); //queue up the request } foreach ($futures as $future) { // access future's values, causing resolution if necessary echo $future['_source']; }
请求队列会并行执行,执行后赋值给 futures 数组。每批请求默认为 100 个。
如果你想强制解析 future 对象,但又不立刻获取响应数据。你可以用 future 对象的 wait()
方法来强制解析:
$client = ClientBuilder::create()->build(); $futures = []; for ($i = 0; $i < 1000; $i++) { $params = [ 'index' => 'test', 'type' => 'test', 'id' => $i, 'client' => [ 'future' => 'lazy' ] ]; $futures[] = $client->get($params); //queue up the request } //wait() forces future resolution and will execute the underlying curl batch $futures[999]->wait();
更改批量值
edit默认的批量值为 100 个,这意味着在客户端强制 future 对象解析前(执行 curl_multi
调用),队列可以容纳 100 个请求。批量值可以更改,取决于你的需求。批量值的调整是通过配置 HTTP handler 时设置 max_handles
参数来实现:
$handlerParams = [ 'max_handles' => 500 ]; $defaultHandler = ClientBuilder::defaultHandler($handlerParams); $client = ClientBuilder::create() ->setHandler($defaultHandler) ->build();
上面的设置会更改批量发送数量为 500。注意:不管队列数量是否为最大批量值,强制解析 future 对象都会引起底层的 curl 执行批量请求操作。在如下的示例中,只有 499 个对象加入队列,但最后的 future 对象被解析会引起强制发送批量请求:
$handlerParams = [ 'max_handles' => 500 ]; $defaultHandler = ClientBuilder::defaultHandler($handlerParams); $client = ClientBuilder::create() ->setHandler($defaultHandler) ->build(); $futures = []; for ($i = 0; $i < 499; $i++) { $params = [ 'index' => 'test', 'type' => 'test', 'id' => $i, 'client' => [ 'future' => 'lazy' ] ]; $futures[] = $client->get($params); //queue up the request } // resolve the future, and therefore the underlying batch $body = $future[499]['body'];
各种批量执行
edit队列里面允许存在各种请求。比如,你可以把 get 请求、index 请求和 search 请求放到队列里面:
$client = ClientBuilder::create()->build(); $futures = []; $params = [ 'index' => 'test', 'type' => 'test', 'id' => 1, 'client' => [ 'future' => 'lazy' ] ]; $futures['getRequest'] = $client->get($params); // First request $params = [ 'index' => 'test', 'type' => 'test', 'id' => 2, 'body' => [ 'field' => 'value' ], 'client' => [ 'future' => 'lazy' ] ]; $futures['indexRequest'] = $client->index($params); // Second request $params = [ 'index' => 'test', 'type' => 'test', 'body' => [ 'query' => [ 'match' => [ 'field' => 'value' ] ] ], 'client' => [ 'future' => 'lazy' ] ]; $futures['searchRequest'] = $client->search($params); // Third request // Resolve futures...blocks until network call completes $searchResults = $futures['searchRequest']['hits']; // Should return immediately, since the previous future resolved the entire batch $doc = $futures['getRequest']['_source'];
警告
edit使用 future 模式时需要注意几点。最大也是最明显的问题是:你要自己去解析 future 对象。这挺麻烦的,而且偶尔会引起一些意料不到的状况。
例如,假如你手动使用 wait()
方法解析,在需要重新构建 future 对象并解析的情况下,你也许要调用好几次 wait()
方法。这是因为每次重新构造 future 对象都会引起 future 对象的重新赋值(覆盖解析结果),所以每个 future 对象都要重新解析获取结果。
如果你使用 ArrayInterface 返回的结果( $response['hits']['hits']
)则不用进行额外处理。然而 FutureArrayInterface 就要全面解析 future 对象才能使用响应数据。
另外一点是一些方法会失效。比如 exists 方法( $client->exists()
, $client->indices()->exists
, $client->indices->templateExists()
等)在正常情况下会返回 true 或 false。
当使用 future 模式时,future 对象还未封装好,这代表客户端无法检测响应结果和返回 true 或 false。所以你会得到从 Elasticsearch 返回的未封装响应数据,而你不得不对这些数据进行处理。
这些注意事项也适用于 ping()
方法。