Skip to content

Commit

Permalink
Merge branch 'webdevsHub-ScanAndScroll'
Browse files Browse the repository at this point in the history
  • Loading branch information
ruflin committed May 25, 2014
2 parents 99984fe + ee541b6 commit 83fd5c4
Show file tree
Hide file tree
Showing 5 changed files with 282 additions and 0 deletions.
1 change: 1 addition & 0 deletions changes.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ CHANGES

2014-05-25
- Added Guzzle transport as an alternative to the default Http transport #618
- Added Elastica\ScanAndScroll Iterator (http://www.elasticsearch.org/guide/en/elasticsearch/guide/current/scan-scroll.html) #617

2014-05-13
- Add JSON compat library; Elasticsearch JSON flags and nicer error handling #614
Expand Down
150 changes: 150 additions & 0 deletions lib/Elastica/ScanAndScroll.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
<?php

namespace Elastica;

/**
* scan and scroll object
*
* @category Xodoa
* @package Elastica
* @author Manuel Andreo Garcia <[email protected]>
* @link http://www.elasticsearch.org/guide/en/elasticsearch/guide/current/scan-scroll.html
*/
class ScanAndScroll implements \Iterator {

/**
* time value parameter
*
* @link http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-request-scroll.html
* @var string
*/
public $expiryTime;

/**
* @var int
*/
public $sizePerShard;

/**
* @var Search
*/
protected $_search;

/**
* @var null|string
*/
protected $_nextScrollId = null;

/**
* @var null|string
*/
protected $_lastScrollId = null;

/**
* @var null|ResultSet
*/
protected $_currentResultSet = null;

/**
* Constructs scroll iterator object
*
* @param Search $search
* @param string $expiryTime
* @param int $sizePerShard
*/
public function __construct(Search $search, $expiryTime = '1m', $sizePerShard = 1000) {
$this->_search = $search;
$this->expiryTime = $expiryTime;
$this->sizePerShard = $sizePerShard;
}

/**
* Return the current result set
*
* @link http://php.net/manual/en/iterator.current.php
* @return ResultSet
*/
public function current() {
return $this->_currentResultSet;
}

/**
* Perform next scroll search
*
* @link http://php.net/manual/en/iterator.next.php
* @return void
*/
public function next() {
$this->_scroll();
}

/**
* Return the scroll id of current scroll request
*
* @link http://php.net/manual/en/iterator.key.php
* @return string
*/
public function key() {
return $this->_lastScrollId;
}

/**
* Returns true if current result set contains one hit
*
* @link http://php.net/manual/en/iterator.valid.php
* @return boolean
*/
public function valid() {
return
$this->_nextScrollId !== null
&& $this->_currentResultSet !== null
&& $this->_currentResultSet->count() > 0;
}

/**
* Start the initial scan search
* @link http://php.net/manual/en/iterator.rewind.php
* @throws \Elastica\Exception\InvalidException
* @return void
*/
public function rewind() {
$this->_search->getQuery()->setSize($this->sizePerShard);

$this->_search->setOption(Search::OPTION_SEARCH_TYPE, Search::OPTION_SEARCH_TYPE_SCAN);
$this->_search->setOption(Search::OPTION_SCROLL, $this->expiryTime);

// initial scan request
$this->_setScrollId($this->_search->search());

// trigger first scroll request
$this->_scroll();
}

/**
* Perform next scroll search
* @throws \Elastica\Exception\InvalidException
* @return void
*/
protected function _scroll() {
$this->_search->setOption(Search::OPTION_SEARCH_TYPE, Search::OPTION_SEARCH_TYPE_SCROLL);
$this->_search->setOption(Search::OPTION_SCROLL_ID, $this->_nextScrollId);

$resultSet = $this->_search->search();
$this->_currentResultSet = $resultSet;
$this->_setScrollId($resultSet);
}

/**
* Save last scroll id and extract the new one if possible
* @param ResultSet $resultSet
*/
protected function _setScrollId(ResultSet $resultSet) {
$this->_lastScrollId = $this->_nextScrollId;

$this->_nextScrollId = null;
if($resultSet->getResponse()->isOk()) {
$this->_nextScrollId = $resultSet->getResponse()->getScrollId();
}
}

}
12 changes: 12 additions & 0 deletions lib/Elastica/Search.php
Original file line number Diff line number Diff line change
Expand Up @@ -499,4 +499,16 @@ public function setSuggest(Suggest $suggest)
{
return $this->setOptionsAndQuery(array(self::OPTION_SEARCH_TYPE_SUGGEST => 'suggest'), $suggest);
}

/**
* Returns the ScanAndScroll Iterator
*
* @see Elastica\ScanAndScroll
* @param string $expiryTime
* @param int $sizePerShard
* @return ScanAndScroll
*/
public function scanAndScroll($expiryTime = '1m', $sizePerShard = 1000) {
return new ScanAndScroll($this, $expiryTime, $sizePerShard);
}
}
114 changes: 114 additions & 0 deletions test/lib/Elastica/Test/ScanAndScrollTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
<?php

namespace Elastica\Test;

use Elastica\Document;
use Elastica\Query;
use Elastica\ResultSet;
use Elastica\ScanAndScroll;
use Elastica\Search;
use Elastica\Test\Base as BaseTest;

class ScanAndScrollTest extends BaseTest {

public function testConstruct() {
$scanAndScroll = $this->_prepareScanAndScroll();

$this->assertInstanceOf('Elastica\ScanAndScroll', $scanAndScroll);
}

public function testDefaultProperties() {
$scanAndScroll = $this->_prepareScanAndScroll();

$this->assertEquals('1m', $scanAndScroll->expiryTime);
$this->assertEquals(1000, $scanAndScroll->sizePerShard);
}

public function testQuerySizeOverride() {
$query = new Query();
$query->setSize(100);

$index = $this->_createIndex('test_1');
$type = $index->getType('scanAndScrollTest');

$search = new Search($this->_getClient());
$search->addIndex($index)->addType($type);
$search->setQuery($query);

$scanAndScroll = new ScanAndScroll($search);
$scanAndScroll->sizePerShard = 10;
$scanAndScroll->rewind();

$this->assertEquals(10, $query->getParam('size'));
}

public function testSizePerShard() {
$search = $this->_prepareSearch('test_2', 2, 20);

$scanAndScroll = new ScanAndScroll($search);
$scanAndScroll->sizePerShard = 5;
$scanAndScroll->rewind();

$this->assertEquals(10, $scanAndScroll->current()->count());
}

public function testScrollId() {
$search = $this->_prepareSearch('test_3', 1, 2);

$scanAndScroll = new ScanAndScroll($search);
$scanAndScroll->sizePerShard = 1;

$scanAndScroll->rewind();
$this->assertEquals(
$scanAndScroll->current()->getResponse()->getScrollId(),
$scanAndScroll->key()
);
}

public function testForeach() {
$search = $this->_prepareSearch('test_4', 2, 11);

$scanAndScroll = new ScanAndScroll($search);
$scanAndScroll->sizePerShard = 5;

// We expect 2 scrolls:
// 1. with 10 hits,
// 2. with 1 hit
// Note: there is a 3. scroll with 0 hits

$count = 0;
foreach($scanAndScroll as $resultSet) {
/** @var ResultSet $resultSet */
$count++;

switch(true) {
case $count == 1: $this->assertEquals(10, $resultSet->count()); break;
case $count == 2: $this->assertEquals(1, $resultSet->count()); break;
}
}

$this->assertEquals(2, $count);
}

private function _prepareScanAndScroll() {
return new ScanAndScroll(new Search($this->_getClient()));
}

private function _prepareSearch($indexName, $indexShards, $docs) {
$index = $this->_createIndex($indexName, true, $indexShards);
$type = $index->getType('scanAndScrollTest');

$insert = array();
for ($x = 1; $x <= $docs; $x++) {
$insert[] = new Document($x, array('id' => $x, 'key' => 'value'));
}

$type->addDocuments($insert);
$index->refresh();

$search = new Search($this->_getClient());
$search->addIndex($index)->addType($type);

return $search;
}
}
5 changes: 5 additions & 0 deletions test/lib/Elastica/Test/SearchTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -492,4 +492,9 @@ public function testCount() {
$this->assertInstanceOf('\Elastica\ResultSet', $result2);
$this->assertEquals(1, $result2->getTotalHits());
}

public function testScanAndScroll() {
$search = new Search($this->_getClient());
$this->assertInstanceOf('Elastica\ScanAndScroll', $search->scanAndScroll());
}
}

0 comments on commit 83fd5c4

Please sign in to comment.