Skip to content

Commit

Permalink
Added Reindex and deprecated CrossIndex (ruflin#1311)
Browse files Browse the repository at this point in the history
  • Loading branch information
Giovanni Albero committed May 21, 2017
1 parent fa694e0 commit 417d53e
Show file tree
Hide file tree
Showing 4 changed files with 292 additions and 1 deletion.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ All notable changes to this project will be documented in this file based on the
- Add support for Health parameters for Cluster\Health endpoint (new prop : delayed_unassigned_shards, number_of_pending_tasks, number_of_in_flight_fetch, task_max_waiting_in_queue_millis, active_shards_percent_as_number)
- Add support for querystring in Type. this allow to use `update_all_types` in type mapping in order to resolve conflicts between fields in different types. [Conflicts between fields in different types](https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html#merging-conflicts)
- Added `\Elastica\Query\ParentId` to avoid join with parent documents [#1287](https://github.com/ruflin/Elastica/issues/1287)
- Added `\Elastica\Reindex` for reindexing between indices [#1311](https://github.com/ruflin/Elastica/issues/1311)

### Improvements

- Added support for `other_bucket` and `other_bucket_key` paramters on `Elastica\Aggregation\Filters`

### Deprecated

- Deprecated `Tool\CrossIndex` use `\Elastica\Reindex` instead [#1311](https://github.com/ruflin/Elastica/issues/1311)

## [Unreleased](https://github.com/ruflin/Elastica/compare/5.1.0...5.2.0)

Expand Down
117 changes: 117 additions & 0 deletions lib/Elastica/Reindex.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
<?php
namespace Elastica;

use Elastica\Query\AbstractQuery;

class Reindex
{
const VERSION_TYPE = 'version_type';
const VERSION_TYPE_INTERNAL = 'internal';
const VERSION_TYPE_EXTERNAL = 'external';
const OPERATION_TYPE = 'op_type';
const OPERATION_TYPE_CREATE = 'create';
const CONFLICTS = 'conflicts';
const CONFLICTS_PROCEED = 'proceed';
const TYPE = 'type';
const SIZE = 'size';
const QUERY = 'query';

public static function reindex(Index $oldIndex, Index $newIndex, array $options = [])
{
$body = self::getBody($oldIndex, $newIndex, $options);

$reindexEndpoint = new \Elasticsearch\Endpoints\Reindex();
$reindexEndpoint->setBody($body);

$oldIndex->getClient()->requestEndpoint($reindexEndpoint);
$newIndex->refresh();

return $newIndex;
}

private static function getBody($oldIndex, $newIndex, $options)
{
$body = array_merge([
'source' => self::getSourcePartBody($oldIndex, $options),
'dest' => self::getDestPartBody($newIndex, $options)
], self::resolveBodyOptions($options));

return $body;
}

private static function getSourcePartBody(Index $index, array $options)
{
$sourceBody = array_merge([
'index' => $index->getName(),
], self::resolveSourceOptions($options));

$sourceBody = self::setSourceQuery($sourceBody);
$sourceBody = self::setSourceType($sourceBody);

return $sourceBody;
}

private static function getDestPartBody(Index $index, array $options)
{
return array_merge([
'index' => $index->getName(),
], self::resolveDestOptions($options));
}

private static function resolveSourceOptions(array $options)
{
return array_intersect_key($options, [
self::TYPE => null,
self::QUERY => null,
]);
}

private static function resolveDestOptions(array $options)
{
return array_intersect_key($options, [
self::VERSION_TYPE => null,
self::OPERATION_TYPE => null,
]);
}

private function resolveBodyOptions(array $options)
{
return array_intersect_key($options, [
self::SIZE => null,
self::CONFLICTS => null,
]);
}

/**
* @param array $sourceBody
*
* @return array
*/
private static function setSourceQuery(array $sourceBody)
{
if (isset($sourceBody[self::QUERY]) && $sourceBody[self::QUERY] instanceof AbstractQuery) {
$sourceBody[self::QUERY] = $sourceBody[self::QUERY]->toArray();
}
return $sourceBody;
}

/**
* @param array $sourceBody
*
* @return array
*/
private static function setSourceType(array $sourceBody)
{
if (isset($sourceBody[self::TYPE]) && !is_array($sourceBody[self::TYPE])) {
$sourceBody[self::TYPE] = [$sourceBody[self::TYPE]];
}
if (isset($sourceBody[self::TYPE])) {
foreach ($sourceBody[self::TYPE] as $key => $type) {
if ($type instanceof Type) {
$sourceBody[self::TYPE][$key] = $type->getName();
}
}
}
return $sourceBody;
}
}
2 changes: 2 additions & 0 deletions lib/Elastica/Tool/CrossIndex.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
* Functions to move documents and types between indices.
*
* @author Manuel Andreo Garcia <[email protected]>
*
* @deprecated use Reindex instead. This class will be removed in further Elastica releases.
*/
class CrossIndex
{
Expand Down
171 changes: 171 additions & 0 deletions test/Elastica/ReindexTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
<?php
namespace Elastica\Test;

use Elastica\Document;
use Elastica\Index;
use Elastica\Query\Match;
use Elastica\Reindex;
use Elastica\Type;

class ReindexTest extends Base
{
/**
* Test default reindex.
*
* @group functional
*/
public function testReindex()
{
$oldIndex = $this->_createIndex('idx1', true, 2);
$this->_addDocs($oldIndex->getType('resetTest'), 10);

$newIndex = $this->_createIndex('idx2', true, 2);

$newIndex = Reindex::reindex($oldIndex, $newIndex);
$this->assertInstanceOf(
Index::class,
$newIndex
);

$this->assertEquals(10, $newIndex->count());

$oldResult = [];

foreach ($oldIndex->search()->getResults() as $result) {
$oldResult[] = $result->getData();
}

$newResult = [];

foreach ($newIndex->search()->getResults() as $result) {
$newResult[] = $result->getData();
}

$this->assertEquals($oldResult, $newResult);
}

/**
* Test reindex type option.
*
* @group functional
*/
public function testReindexTypeOption()
{
$oldIndex = $this->_createIndex('', true, 2);
$type1 = $oldIndex->getType('crossIndexTest_1');
$type2 = $oldIndex->getType('crossIndexTest_2');

$docs1 = $this->_addDocs($type1, 10);
$docs2 = $this->_addDocs($type2, 10);

$newIndex = $this->_createIndex(null, true, 2);

Reindex::reindex($oldIndex, $newIndex, [
Reindex::TYPE => 'crossIndexTest_1',
]);
$this->assertEquals(10, $newIndex->count());
$newIndex->deleteDocuments($docs1);

// string
Reindex::reindex($oldIndex, $newIndex, [
Reindex::TYPE => 'crossIndexTest_2',
]);
$this->assertEquals(10, $newIndex->count());
$newIndex->deleteDocuments($docs2);

// array
Reindex::reindex($oldIndex, $newIndex, [
Reindex::TYPE => [
$type1,
'crossIndexTest_2',
],
]);
$this->assertEquals(20, $newIndex->count());
}

/**
* @group functional
*/
public function testReindexOpTypeOptionWithProceedSetOnConflicts()
{
$oldIndex = $this->_createIndex('idx1', true, 2);
$type1 = $oldIndex->getType('crossIndexTest_1');

$docs1 = $this->_addDocs($type1, 10);

$subDocs1 = array_splice($docs1, 0, 5);

$newIndex = $this->_createIndex('idx2', true, 2);
$newIndex->addDocuments($subDocs1);
$newIndex->refresh();

$this->assertEquals(5, $newIndex->count());

Reindex::reindex($oldIndex, $newIndex, [
Reindex::OPERATION_TYPE => Reindex::OPERATION_TYPE_CREATE,
Reindex::CONFLICTS => Reindex::CONFLICTS_PROCEED,
]);

$this->assertEquals(10, $newIndex->count());
}

/**
* @group functional
*/
public function testReindexWithQueryOption()
{
$oldIndex = $this->_createIndex('idx1', true, 2);
$type1 = $oldIndex->getType('crossIndexTest_1');
$docs1 = $this->_addDocs($type1, 10);

$newIndex = $this->_createIndex('idx2', true, 2);

$query = new Match('id', 8);
Reindex::reindex($oldIndex, $newIndex, [
Reindex::QUERY => $query,
]);

$results = $newIndex->search()->getResults();
$this->assertEquals(1, $newIndex->count());
foreach ($results as $result) {
$this->assertEquals($docs1[7]->getData(), $result->getData());
}
}

/**
* @group functional
*/
public function testReindexWithSizeOption()
{
$oldIndex = $this->_createIndex('idx1', true, 2);
$type1 = $oldIndex->getType('crossIndexTest_1');
$this->_addDocs($type1, 10);

$newIndex = $this->_createIndex('idx2', true, 2);

Reindex::reindex($oldIndex, $newIndex, [
Reindex::SIZE => 5,
]);

$this->assertEquals(5, $newIndex->count());
}

/**
* @param Type $type
* @param int $docs
*
* @return array
*/
private function _addDocs(Type $type, $docs)
{
$insert = [];
for ($i = 1; $i <= $docs; ++$i) {
$insert[] = new Document($i, ['id' => $i, 'key' => 'value']);
}

$type->addDocuments($insert);
$type->getIndex()->refresh();

return $insert;
}
}

0 comments on commit 417d53e

Please sign in to comment.