-
Notifications
You must be signed in to change notification settings - Fork 732
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
b226c37
commit aad5fdc
Showing
4 changed files
with
297 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
<?php | ||
|
||
namespace Elastica\Tool; | ||
|
||
use Elastica\Bulk; | ||
use Elastica\Index; | ||
use Elastica\Query\MatchAll; | ||
use Elastica\ScanAndScroll; | ||
use Elastica\Search; | ||
use Elastica\Type; | ||
|
||
/** | ||
* Functions to move documents and types between indices | ||
* | ||
* @author Manuel Andreo Garcia <[email protected]> | ||
*/ | ||
class CrossIndex | ||
{ | ||
/** | ||
* Type option | ||
* | ||
* type: string | string[] | \Elastica\Type | \Elastica\Type[] | null | ||
* default: null (means all types) | ||
*/ | ||
const OPTION_TYPE = 'type'; | ||
|
||
/** | ||
* Query option | ||
* | ||
* type: see \Elastica\Query::create() | ||
* default: Elastica\Query\MatchAll | ||
*/ | ||
const OPTION_QUERY = 'query'; | ||
|
||
/** | ||
* Expiry time option | ||
* | ||
* type: string (see Elastica\ScanAndScroll) | ||
* default: '1m' | ||
*/ | ||
const OPTION_EXPIRY_TIME = 'expiryTime'; | ||
|
||
/** | ||
* Size per shard option | ||
* | ||
* type: int (see Elastica\ScanAndScroll) | ||
* default: 1000 | ||
*/ | ||
const OPTION_SIZE_PER_SHARD = 'sizePerShard'; | ||
|
||
/** | ||
* Reindex documents from an old index to a new index | ||
* | ||
* @link https://www.elastic.co/guide/en/elasticsearch/guide/master/reindex.html | ||
* | ||
* @param \Elastica\Index $oldIndex | ||
* @param \Elastica\Index $newIndex | ||
* @param array $options keys: CrossIndex::OPTION_* constants | ||
* | ||
* @return \Elastica\Index The new index object | ||
*/ | ||
public static function reindex( | ||
Index $oldIndex, | ||
Index $newIndex, | ||
array $options = array() | ||
) { | ||
// prepare search | ||
$search = new Search($oldIndex->getClient()); | ||
|
||
$options = array_merge( | ||
array( | ||
self::OPTION_TYPE => null, | ||
self::OPTION_QUERY => new MatchAll(), | ||
self::OPTION_EXPIRY_TIME => '1m', | ||
self::OPTION_SIZE_PER_SHARD => 1000, | ||
), | ||
$options | ||
); | ||
|
||
$search->addIndex($oldIndex); | ||
if (isset($options[self::OPTION_TYPE])) { | ||
$type = $options[self::OPTION_TYPE]; | ||
$search->addTypes(is_array($type) ? $type : array($type)); | ||
} | ||
$search->setQuery($options[self::OPTION_QUERY]); | ||
|
||
// search on old index and bulk insert in new index | ||
$scanAndScroll = new ScanAndScroll( | ||
$search, | ||
$options[self::OPTION_EXPIRY_TIME], | ||
$options[self::OPTION_SIZE_PER_SHARD] | ||
); | ||
foreach ($scanAndScroll as $resultSet) { | ||
$bulk = new Bulk($newIndex->getClient()); | ||
$bulk->setIndex($newIndex); | ||
|
||
foreach ($resultSet as $result) { | ||
$action = new Bulk\Action(); | ||
$action->setType($result->getType()); | ||
$action->setId($result->getId()); | ||
$action->setSource($result->getData()); | ||
|
||
$bulk->addAction($action); | ||
} | ||
|
||
$bulk->send(); | ||
} | ||
|
||
$newIndex->refresh(); | ||
|
||
return $newIndex; | ||
} | ||
|
||
/** | ||
* Copies type mappings and documents from an old index to a new index | ||
* | ||
* @see \Elastica\Tool\CrossIndex::reindex() | ||
* | ||
* @param \Elastica\Index $oldIndex | ||
* @param \Elastica\Index $newIndex | ||
* @param array $options keys: CrossIndex::OPTION_* constants | ||
* | ||
* @return \Elastica\Index The new index object | ||
*/ | ||
public static function copy( | ||
Index $oldIndex, | ||
Index $newIndex, | ||
array $options = array() | ||
) { | ||
// normalize types to array of string | ||
$types = array(); | ||
if (isset($options[self::OPTION_TYPE])) { | ||
$types = $options[self::OPTION_TYPE]; | ||
$types = is_array($types) ? $types : array($types); | ||
|
||
$types = array_map( | ||
function ($type) { | ||
if ($type instanceof Type) { | ||
$type = $type->getName(); | ||
} | ||
|
||
return (string) $type; | ||
}, | ||
$types | ||
); | ||
} | ||
|
||
// copy mapping | ||
foreach ($oldIndex->getMapping() as $type => $mapping) { | ||
if (!empty($types) && !in_array($type, $types, true)) { | ||
continue; | ||
} | ||
|
||
$type = new Type($newIndex, $type); | ||
$type->setMapping($mapping['properties']); | ||
} | ||
|
||
// copy documents | ||
return self::reindex($oldIndex, $newIndex, $options); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,7 +2,6 @@ | |
|
||
namespace Elastica; | ||
|
||
use Elastica\Bulk\Action; | ||
/** | ||
* Elastica tools | ||
* | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
<?php | ||
|
||
namespace Elastica\Test\Tool; | ||
|
||
use Elastica\Document; | ||
use Elastica\Test\Base; | ||
use Elastica\Tool\CrossIndex; | ||
use Elastica\Type; | ||
|
||
class CrossIndexTest extends Base | ||
{ | ||
/** | ||
* Test default reindex | ||
*/ | ||
public function testReindex() | ||
{ | ||
$oldIndex = $this->_createIndex(null, true, 2); | ||
$this->_addDocs($oldIndex->getType('crossIndexTest'), 10); | ||
|
||
$newIndex = $this->_createIndex(null, true, 2); | ||
|
||
$this->assertInstanceOf( | ||
'Elastica\Index', | ||
CrossIndex::reindex($oldIndex, $newIndex) | ||
); | ||
|
||
$this->assertEquals(10, $newIndex->count()); | ||
} | ||
|
||
/** | ||
* Test reindex type option | ||
*/ | ||
public function testReindexTypeOption() | ||
{ | ||
$oldIndex = $this->_createIndex('', true, 2); | ||
$type1 = $oldIndex->getType('crossIndexTest_1'); | ||
$type2 = $oldIndex->getType('crossIndexTest_2'); | ||
|
||
$docs1 = $this->_addDocs($type1, 10); | ||
$docs2 = $this->_addDocs($type2, 10); | ||
|
||
$newIndex = $this->_createIndex(null, true, 2); | ||
|
||
// \Elastica\Type | ||
CrossIndex::reindex($oldIndex, $newIndex, array( | ||
CrossIndex::OPTION_TYPE => $type1, | ||
)); | ||
$this->assertEquals(10, $newIndex->count()); | ||
$newIndex->deleteDocuments($docs1); | ||
|
||
// string | ||
CrossIndex::reindex($oldIndex, $newIndex, array( | ||
CrossIndex::OPTION_TYPE => 'crossIndexTest_2', | ||
)); | ||
$this->assertEquals(10, $newIndex->count()); | ||
$newIndex->deleteDocuments($docs2); | ||
|
||
// array | ||
CrossIndex::reindex($oldIndex, $newIndex, array( | ||
CrossIndex::OPTION_TYPE => array( | ||
'crossIndexTest_1', | ||
$type2, | ||
), | ||
)); | ||
$this->assertEquals(20, $newIndex->count()); | ||
} | ||
|
||
/** | ||
* Test default copy | ||
*/ | ||
public function testCopy() | ||
{ | ||
$oldIndex = $this->_createIndex(null, true, 2); | ||
$newIndex = $this->_createIndex(null, true, 2); | ||
|
||
$oldType = $oldIndex->getType('copy_test'); | ||
$oldMapping = array( | ||
'name' => array( | ||
'type' => 'string', | ||
'store' => true, | ||
), | ||
); | ||
$oldType->setMapping($oldMapping); | ||
$docs = $this->_addDocs($oldType, 10); | ||
|
||
// mapping | ||
$this->assertInstanceOf( | ||
'Elastica\Index', | ||
CrossIndex::copy($oldIndex, $newIndex) | ||
); | ||
|
||
$newMapping = $newIndex->getType('copy_test')->getMapping(); | ||
if (!isset($newMapping['copy_test']['properties']['name'])) { | ||
$this->fail('could not request new mapping'); | ||
} | ||
|
||
$this->assertEquals( | ||
$oldMapping['name'], | ||
$newMapping['copy_test']['properties']['name'] | ||
); | ||
|
||
// document copy | ||
$this->assertEquals(10, $newIndex->count()); | ||
$newIndex->deleteDocuments($docs); | ||
|
||
// ignore mapping | ||
$ignoredType = $oldIndex->getType('copy_test_1'); | ||
$this->_addDocs($ignoredType, 10); | ||
|
||
CrossIndex::copy($oldIndex, $newIndex, array( | ||
CrossIndex::OPTION_TYPE => $oldType, | ||
)); | ||
|
||
$this->assertFalse($newIndex->getType($ignoredType->getName())->exists()); | ||
$this->assertEquals(10, $newIndex->count()); | ||
} | ||
|
||
/** | ||
* @param Type $type | ||
* @param int $docs | ||
* | ||
* @return array | ||
*/ | ||
private function _addDocs(Type $type, $docs) | ||
{ | ||
$insert = array(); | ||
for ($i = 1; $i <= $docs; $i++) { | ||
$insert[] = new Document($i, array('_id' => $i, 'key' => 'value')); | ||
} | ||
|
||
$type->addDocuments($insert); | ||
$type->getIndex()->refresh(); | ||
|
||
return $insert; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters