diff --git a/lib/Elastica/Tool/CrossIndex.php b/lib/Elastica/Tool/CrossIndex.php new file mode 100644 index 0000000000..89cc3d4ce8 --- /dev/null +++ b/lib/Elastica/Tool/CrossIndex.php @@ -0,0 +1,161 @@ + + */ +class CrossIndex +{ + /** + * Type option + * + * type: string | string[] | \Elastica\Type | \Elastica\Type[] | null + * default: null (means all types) + */ + const OPTION_TYPE = 'type'; + + /** + * Query option + * + * type: see \Elastica\Query::create() + * default: Elastica\Query\MatchAll + */ + const OPTION_QUERY = 'query'; + + /** + * Expiry time option + * + * type: string (see Elastica\ScanAndScroll) + * default: '1m' + */ + const OPTION_EXPIRY_TIME = 'expiryTime'; + + /** + * Size per shard option + * + * type: int (see Elastica\ScanAndScroll) + * default: 1000 + */ + const OPTION_SIZE_PER_SHARD = 'sizePerShard'; + + /** + * Reindex documents from an old index to a new index + * + * @link https://www.elastic.co/guide/en/elasticsearch/guide/master/reindex.html + * + * @param \Elastica\Index $oldIndex + * @param \Elastica\Index $newIndex + * @param array $options keys: CrossIndex::OPTION_* constants + * + * @return \Elastica\Index The new index object + */ + public static function reindex( + Index $oldIndex, + Index $newIndex, + array $options = array() + ) { + // prepare search + $search = new Search($oldIndex->getClient()); + + $options = array_merge( + array( + self::OPTION_TYPE => null, + self::OPTION_QUERY => new MatchAll(), + self::OPTION_EXPIRY_TIME => '1m', + self::OPTION_SIZE_PER_SHARD => 1000, + ), + $options + ); + + $search->addIndex($oldIndex); + if (isset($options[self::OPTION_TYPE])) { + $type = $options[self::OPTION_TYPE]; + $search->addTypes(is_array($type) ? $type : array($type)); + } + $search->setQuery($options[self::OPTION_QUERY]); + + // search on old index and bulk insert in new index + $scanAndScroll = new ScanAndScroll( + $search, + $options[self::OPTION_EXPIRY_TIME], + $options[self::OPTION_SIZE_PER_SHARD] + ); + foreach ($scanAndScroll as $resultSet) { + $bulk = new Bulk($newIndex->getClient()); + $bulk->setIndex($newIndex); + + foreach ($resultSet as $result) { + $action = new Bulk\Action(); + $action->setType($result->getType()); + $action->setId($result->getId()); + $action->setSource($result->getData()); + + $bulk->addAction($action); + } + + $bulk->send(); + } + + $newIndex->refresh(); + + return $newIndex; + } + + /** + * Copies type mappings and documents from an old index to a new index + * + * @see \Elastica\Tool\CrossIndex::reindex() + * + * @param \Elastica\Index $oldIndex + * @param \Elastica\Index $newIndex + * @param array $options keys: CrossIndex::OPTION_* constants + * + * @return \Elastica\Index The new index object + */ + public static function copy( + Index $oldIndex, + Index $newIndex, + array $options = array() + ) { + // normalize types to array of string + $types = array(); + if (isset($options[self::OPTION_TYPE])) { + $types = $options[self::OPTION_TYPE]; + $types = is_array($types) ? $types : array($types); + + $types = array_map( + function ($type) { + if ($type instanceof Type) { + $type = $type->getName(); + } + + return (string) $type; + }, + $types + ); + } + + // copy mapping + foreach ($oldIndex->getMapping() as $type => $mapping) { + if (!empty($types) && !in_array($type, $types, true)) { + continue; + } + + $type = new Type($newIndex, $type); + $type->setMapping($mapping['properties']); + } + + // copy documents + return self::reindex($oldIndex, $newIndex, $options); + } +} diff --git a/lib/Elastica/Util.php b/lib/Elastica/Util.php index 74330ef69f..e47252b3ca 100644 --- a/lib/Elastica/Util.php +++ b/lib/Elastica/Util.php @@ -2,7 +2,6 @@ namespace Elastica; -use Elastica\Bulk\Action; /** * Elastica tools * diff --git a/test/lib/Elastica/Test/Tool/CrossIndexTest.php b/test/lib/Elastica/Test/Tool/CrossIndexTest.php new file mode 100644 index 0000000000..bdffc2e8b2 --- /dev/null +++ b/test/lib/Elastica/Test/Tool/CrossIndexTest.php @@ -0,0 +1,136 @@ +_createIndex(null, true, 2); + $this->_addDocs($oldIndex->getType('crossIndexTest'), 10); + + $newIndex = $this->_createIndex(null, true, 2); + + $this->assertInstanceOf( + 'Elastica\Index', + CrossIndex::reindex($oldIndex, $newIndex) + ); + + $this->assertEquals(10, $newIndex->count()); + } + + /** + * Test reindex type option + */ + public function testReindexTypeOption() + { + $oldIndex = $this->_createIndex('', true, 2); + $type1 = $oldIndex->getType('crossIndexTest_1'); + $type2 = $oldIndex->getType('crossIndexTest_2'); + + $docs1 = $this->_addDocs($type1, 10); + $docs2 = $this->_addDocs($type2, 10); + + $newIndex = $this->_createIndex(null, true, 2); + + // \Elastica\Type + CrossIndex::reindex($oldIndex, $newIndex, array( + CrossIndex::OPTION_TYPE => $type1, + )); + $this->assertEquals(10, $newIndex->count()); + $newIndex->deleteDocuments($docs1); + + // string + CrossIndex::reindex($oldIndex, $newIndex, array( + CrossIndex::OPTION_TYPE => 'crossIndexTest_2', + )); + $this->assertEquals(10, $newIndex->count()); + $newIndex->deleteDocuments($docs2); + + // array + CrossIndex::reindex($oldIndex, $newIndex, array( + CrossIndex::OPTION_TYPE => array( + 'crossIndexTest_1', + $type2, + ), + )); + $this->assertEquals(20, $newIndex->count()); + } + + /** + * Test default copy + */ + public function testCopy() + { + $oldIndex = $this->_createIndex(null, true, 2); + $newIndex = $this->_createIndex(null, true, 2); + + $oldType = $oldIndex->getType('copy_test'); + $oldMapping = array( + 'name' => array( + 'type' => 'string', + 'store' => true, + ), + ); + $oldType->setMapping($oldMapping); + $docs = $this->_addDocs($oldType, 10); + + // mapping + $this->assertInstanceOf( + 'Elastica\Index', + CrossIndex::copy($oldIndex, $newIndex) + ); + + $newMapping = $newIndex->getType('copy_test')->getMapping(); + if (!isset($newMapping['copy_test']['properties']['name'])) { + $this->fail('could not request new mapping'); + } + + $this->assertEquals( + $oldMapping['name'], + $newMapping['copy_test']['properties']['name'] + ); + + // document copy + $this->assertEquals(10, $newIndex->count()); + $newIndex->deleteDocuments($docs); + + // ignore mapping + $ignoredType = $oldIndex->getType('copy_test_1'); + $this->_addDocs($ignoredType, 10); + + CrossIndex::copy($oldIndex, $newIndex, array( + CrossIndex::OPTION_TYPE => $oldType, + )); + + $this->assertFalse($newIndex->getType($ignoredType->getName())->exists()); + $this->assertEquals(10, $newIndex->count()); + } + + /** + * @param Type $type + * @param int $docs + * + * @return array + */ + private function _addDocs(Type $type, $docs) + { + $insert = array(); + for ($i = 1; $i <= $docs; $i++) { + $insert[] = new Document($i, array('_id' => $i, 'key' => 'value')); + } + + $type->addDocuments($insert); + $type->getIndex()->refresh(); + + return $insert; + } +} diff --git a/test/lib/Elastica/Test/UtilTest.php b/test/lib/Elastica/Test/UtilTest.php index ba7b6d9e9f..4972eb3f56 100644 --- a/test/lib/Elastica/Test/UtilTest.php +++ b/test/lib/Elastica/Test/UtilTest.php @@ -6,7 +6,6 @@ use Elastica\Request; use Elastica\Test\Base as BaseTest; use Elastica\Util; -use Elastica\Exception\ResponseException; class UtilTest extends BaseTest {