Skip to content

Commit

Permalink
Merge pull request #853 from webdevsHub/reindex
Browse files Browse the repository at this point in the history
Added Tool\CrossIndex class
  • Loading branch information
ruflin committed May 29, 2015
2 parents 1eee419 + aad5fdc commit 37a9893
Show file tree
Hide file tree
Showing 4 changed files with 297 additions and 2 deletions.
161 changes: 161 additions & 0 deletions lib/Elastica/Tool/CrossIndex.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
<?php

namespace Elastica\Tool;

use Elastica\Bulk;
use Elastica\Index;
use Elastica\Query\MatchAll;
use Elastica\ScanAndScroll;
use Elastica\Search;
use Elastica\Type;

/**
* Functions to move documents and types between indices
*
* @author Manuel Andreo Garcia <[email protected]>
*/
class CrossIndex
{
/**
* Type option
*
* type: string | string[] | \Elastica\Type | \Elastica\Type[] | null
* default: null (means all types)
*/
const OPTION_TYPE = 'type';

/**
* Query option
*
* type: see \Elastica\Query::create()
* default: Elastica\Query\MatchAll
*/
const OPTION_QUERY = 'query';

/**
* Expiry time option
*
* type: string (see Elastica\ScanAndScroll)
* default: '1m'
*/
const OPTION_EXPIRY_TIME = 'expiryTime';

/**
* Size per shard option
*
* type: int (see Elastica\ScanAndScroll)
* default: 1000
*/
const OPTION_SIZE_PER_SHARD = 'sizePerShard';

/**
* Reindex documents from an old index to a new index
*
* @link https://www.elastic.co/guide/en/elasticsearch/guide/master/reindex.html
*
* @param \Elastica\Index $oldIndex
* @param \Elastica\Index $newIndex
* @param array $options keys: CrossIndex::OPTION_* constants
*
* @return \Elastica\Index The new index object
*/
public static function reindex(
Index $oldIndex,
Index $newIndex,
array $options = array()
) {
// prepare search
$search = new Search($oldIndex->getClient());

$options = array_merge(
array(
self::OPTION_TYPE => null,
self::OPTION_QUERY => new MatchAll(),
self::OPTION_EXPIRY_TIME => '1m',
self::OPTION_SIZE_PER_SHARD => 1000,
),
$options
);

$search->addIndex($oldIndex);
if (isset($options[self::OPTION_TYPE])) {
$type = $options[self::OPTION_TYPE];
$search->addTypes(is_array($type) ? $type : array($type));
}
$search->setQuery($options[self::OPTION_QUERY]);

// search on old index and bulk insert in new index
$scanAndScroll = new ScanAndScroll(
$search,
$options[self::OPTION_EXPIRY_TIME],
$options[self::OPTION_SIZE_PER_SHARD]
);
foreach ($scanAndScroll as $resultSet) {
$bulk = new Bulk($newIndex->getClient());
$bulk->setIndex($newIndex);

foreach ($resultSet as $result) {
$action = new Bulk\Action();
$action->setType($result->getType());
$action->setId($result->getId());
$action->setSource($result->getData());

$bulk->addAction($action);
}

$bulk->send();
}

$newIndex->refresh();

return $newIndex;
}

/**
* Copies type mappings and documents from an old index to a new index
*
* @see \Elastica\Tool\CrossIndex::reindex()
*
* @param \Elastica\Index $oldIndex
* @param \Elastica\Index $newIndex
* @param array $options keys: CrossIndex::OPTION_* constants
*
* @return \Elastica\Index The new index object
*/
public static function copy(
Index $oldIndex,
Index $newIndex,
array $options = array()
) {
// normalize types to array of string
$types = array();
if (isset($options[self::OPTION_TYPE])) {
$types = $options[self::OPTION_TYPE];
$types = is_array($types) ? $types : array($types);

$types = array_map(
function ($type) {
if ($type instanceof Type) {
$type = $type->getName();
}

return (string) $type;
},
$types
);
}

// copy mapping
foreach ($oldIndex->getMapping() as $type => $mapping) {
if (!empty($types) && !in_array($type, $types, true)) {
continue;
}

$type = new Type($newIndex, $type);
$type->setMapping($mapping['properties']);
}

// copy documents
return self::reindex($oldIndex, $newIndex, $options);
}
}
1 change: 0 additions & 1 deletion lib/Elastica/Util.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

namespace Elastica;

use Elastica\Bulk\Action;
/**
* Elastica tools
*
Expand Down
136 changes: 136 additions & 0 deletions test/lib/Elastica/Test/Tool/CrossIndexTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
<?php

namespace Elastica\Test\Tool;

use Elastica\Document;
use Elastica\Test\Base;
use Elastica\Tool\CrossIndex;
use Elastica\Type;

class CrossIndexTest extends Base
{
/**
* Test default reindex
*/
public function testReindex()
{
$oldIndex = $this->_createIndex(null, true, 2);
$this->_addDocs($oldIndex->getType('crossIndexTest'), 10);

$newIndex = $this->_createIndex(null, true, 2);

$this->assertInstanceOf(
'Elastica\Index',
CrossIndex::reindex($oldIndex, $newIndex)
);

$this->assertEquals(10, $newIndex->count());
}

/**
* Test reindex type option
*/
public function testReindexTypeOption()
{
$oldIndex = $this->_createIndex('', true, 2);
$type1 = $oldIndex->getType('crossIndexTest_1');
$type2 = $oldIndex->getType('crossIndexTest_2');

$docs1 = $this->_addDocs($type1, 10);
$docs2 = $this->_addDocs($type2, 10);

$newIndex = $this->_createIndex(null, true, 2);

// \Elastica\Type
CrossIndex::reindex($oldIndex, $newIndex, array(
CrossIndex::OPTION_TYPE => $type1,
));
$this->assertEquals(10, $newIndex->count());
$newIndex->deleteDocuments($docs1);

// string
CrossIndex::reindex($oldIndex, $newIndex, array(
CrossIndex::OPTION_TYPE => 'crossIndexTest_2',
));
$this->assertEquals(10, $newIndex->count());
$newIndex->deleteDocuments($docs2);

// array
CrossIndex::reindex($oldIndex, $newIndex, array(
CrossIndex::OPTION_TYPE => array(
'crossIndexTest_1',
$type2,
),
));
$this->assertEquals(20, $newIndex->count());
}

/**
* Test default copy
*/
public function testCopy()
{
$oldIndex = $this->_createIndex(null, true, 2);
$newIndex = $this->_createIndex(null, true, 2);

$oldType = $oldIndex->getType('copy_test');
$oldMapping = array(
'name' => array(
'type' => 'string',
'store' => true,
),
);
$oldType->setMapping($oldMapping);
$docs = $this->_addDocs($oldType, 10);

// mapping
$this->assertInstanceOf(
'Elastica\Index',
CrossIndex::copy($oldIndex, $newIndex)
);

$newMapping = $newIndex->getType('copy_test')->getMapping();
if (!isset($newMapping['copy_test']['properties']['name'])) {
$this->fail('could not request new mapping');
}

$this->assertEquals(
$oldMapping['name'],
$newMapping['copy_test']['properties']['name']
);

// document copy
$this->assertEquals(10, $newIndex->count());
$newIndex->deleteDocuments($docs);

// ignore mapping
$ignoredType = $oldIndex->getType('copy_test_1');
$this->_addDocs($ignoredType, 10);

CrossIndex::copy($oldIndex, $newIndex, array(
CrossIndex::OPTION_TYPE => $oldType,
));

$this->assertFalse($newIndex->getType($ignoredType->getName())->exists());
$this->assertEquals(10, $newIndex->count());
}

/**
* @param Type $type
* @param int $docs
*
* @return array
*/
private function _addDocs(Type $type, $docs)
{
$insert = array();
for ($i = 1; $i <= $docs; $i++) {
$insert[] = new Document($i, array('_id' => $i, 'key' => 'value'));
}

$type->addDocuments($insert);
$type->getIndex()->refresh();

return $insert;
}
}
1 change: 0 additions & 1 deletion test/lib/Elastica/Test/UtilTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
use Elastica\Request;
use Elastica\Test\Base as BaseTest;
use Elastica\Util;
use Elastica\Exception\ResponseException;

class UtilTest extends BaseTest
{
Expand Down

0 comments on commit 37a9893

Please sign in to comment.