Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Give ui/courier/fetch a makeover #6307

Merged
merged 12 commits into from
Mar 8, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions src/ui/public/courier/fetch/__tests__/doc.js
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
import sinon from 'auto-release-sinon';
import expect from 'expect.js';
import ngMock from 'ngMock';
import CourierDataSourceDocSourceProvider from 'ui/courier/data_source/doc_source';
import CourierFetchRequestDocProvider from 'ui/courier/fetch/request/doc';

describe('Courier DocFetchRequest class', function () {
import DocSourceProvider from '../../data_source/doc_source';
import DocRequestProvider from '../request/doc';

var storage;
var source;
var defer;
var req;
describe('Courier DocFetchRequest class', function () {
let storage;
let source;
let defer;
let req;

var setVersion;
let setVersion;

beforeEach(ngMock.module('kibana'));
beforeEach(ngMock.inject(function (Private, Promise, $injector) {
var DocSource = Private(CourierDataSourceDocSourceProvider);
var DocFetchRequest = Private(CourierFetchRequestDocProvider);
const DocSource = Private(DocSourceProvider);
const DocFetchRequest = Private(DocRequestProvider);

storage =
$injector.get('localStorage').store =
Expand Down
11 changes: 6 additions & 5 deletions src/ui/public/courier/fetch/__tests__/fetch.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ import ngMock from 'ngMock';
import expect from 'expect.js';
import sinon from 'auto-release-sinon';

import FetchProvider from 'ui/courier/fetch';
import IndexPatternProvider from 'fixtures/stubbed_logstash_index_pattern';
import searchResp from 'fixtures/search_response';
import CourierDataSourceDocSourceProvider from 'ui/courier/data_source/doc_source';
import CourierDataSourceSearchSourceProvider from 'ui/courier/data_source/search_source';

import FetchProvider from '../fetch';
import DocSourceProvider from '../../data_source/doc_source';
import SearchSourceProvider from '../../data_source/search_source';

describe('Fetch service', function () {
require('testUtils/noDigestPromises').activateForSuite();
Expand All @@ -24,8 +25,8 @@ describe('Fetch service', function () {
Promise = $injector.get('Promise');
fetch = Private(FetchProvider);
indexPattern = Private(IndexPatternProvider);
DocSource = Private(CourierDataSourceDocSourceProvider);
SearchSource = Private(CourierDataSourceSearchSourceProvider);
DocSource = Private(DocSourceProvider);
SearchSource = Private(SearchSourceProvider);
}));

describe('#doc(docSource)', function () {
Expand Down
13 changes: 7 additions & 6 deletions src/ui/public/courier/fetch/__tests__/fetch_these.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import _ from 'lodash';
import sinon from 'auto-release-sinon';
import expect from 'expect.js';
import ngMock from 'ngMock';
import CourierFetchFetchTheseProvider from 'ui/courier/fetch/_fetch_these';

import FetchTheseProvider from '../fetch_these';

describe('ui/courier/fetch/_fetch_these', () => {

let Promise;
Expand All @@ -22,15 +23,15 @@ describe('ui/courier/fetch/_fetch_these', () => {
return fakeResponses;
}

PrivateProvider.swap(require('ui/courier/fetch/_call_client'), FakeResponsesProvider);
PrivateProvider.swap(require('ui/courier/fetch/_call_response_handlers'), FakeResponsesProvider);
PrivateProvider.swap(require('ui/courier/fetch/_continue_incomplete'), FakeResponsesProvider);
PrivateProvider.swap(require('ui/courier/fetch/call_client'), FakeResponsesProvider);
PrivateProvider.swap(require('ui/courier/fetch/call_response_handlers'), FakeResponsesProvider);
PrivateProvider.swap(require('ui/courier/fetch/continue_incomplete'), FakeResponsesProvider);
}));

beforeEach(ngMock.inject((Private, $injector) => {
$rootScope = $injector.get('$rootScope');
Promise = $injector.get('Promise');
fetchThese = Private(CourierFetchFetchTheseProvider);
fetchThese = Private(FetchTheseProvider);
request = mockRequest();
requests = [ request ];
}));
Expand Down
9 changes: 0 additions & 9 deletions src/ui/public/courier/fetch/_is_request.js

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,29 +1,31 @@
import _ from 'lodash';
import CourierFetchIsRequestProvider from 'ui/courier/fetch/_is_request';
import CourierFetchMergeDuplicateRequestsProvider from 'ui/courier/fetch/_merge_duplicate_requests';
import CourierFetchReqStatusProvider from 'ui/courier/fetch/_req_status';

import IsRequestProvider from './is_request';
import MergeDuplicatesRequestProvider from './merge_duplicate_requests';
import ReqStatusProvider from './req_status';

export default function CourierFetchCallClient(Private, Promise, es, esShardTimeout, sessionId) {

var isRequest = Private(CourierFetchIsRequestProvider);
var mergeDuplicateRequests = Private(CourierFetchMergeDuplicateRequestsProvider);
const isRequest = Private(IsRequestProvider);
const mergeDuplicateRequests = Private(MergeDuplicatesRequestProvider);

var ABORTED = Private(CourierFetchReqStatusProvider).ABORTED;
var DUPLICATE = Private(CourierFetchReqStatusProvider).DUPLICATE;
const ABORTED = Private(ReqStatusProvider).ABORTED;
const DUPLICATE = Private(ReqStatusProvider).DUPLICATE;

function callClient(strategy, requests) {
// merging docs can change status to DUPLICATE, capture new statuses
var statuses = mergeDuplicateRequests(requests);
const statuses = mergeDuplicateRequests(requests);

// get the actual list of requests that we will be fetching
var executable = statuses.filter(isRequest);
var execCount = executable.length;
const executable = statuses.filter(isRequest);
let execCount = executable.length;

// resolved by respond()
var esPromise;
var defer = Promise.defer();
let esPromise;
const defer = Promise.defer();

// for each respond with either the response or ABORTED
var respond = function (responses) {
const respond = function (responses) {
responses = responses || [];
return Promise.map(requests, function (req, i) {
switch (statuses[i]) {
Expand All @@ -43,7 +45,7 @@ export default function CourierFetchCallClient(Private, Promise, es, esShardTime


// handle a request being aborted while being fetched
var requestWasAborted = Promise.method(function (req, i) {
const requestWasAborted = Promise.method(function (req, i) {
if (statuses[i] === ABORTED) {
defer.reject(new Error('Request was aborted twice?'));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { SearchTimeout } from 'ui/errors';
import { RequestFailure } from 'ui/errors';
import { ShardFailure } from 'ui/errors';
import CourierFetchReqStatusProvider from 'ui/courier/fetch/_req_status';
import CourierFetchNotifierProvider from 'ui/courier/fetch/_notifier';
import { RequestFailure, SearchTimeout, ShardFailure } from 'ui/errors';

import ReqStatusProvider from './req_status';
import NotifierProvider from './notifier';

export default function CourierFetchCallResponseHandlers(Private, Promise) {
var ABORTED = Private(CourierFetchReqStatusProvider).ABORTED;
var INCOMPLETE = Private(CourierFetchReqStatusProvider).INCOMPLETE;
var notify = Private(CourierFetchNotifierProvider);
const ABORTED = Private(ReqStatusProvider).ABORTED;
const INCOMPLETE = Private(ReqStatusProvider).INCOMPLETE;
const notify = Private(NotifierProvider);


function callResponseHandlers(requests, responses) {
Expand All @@ -15,7 +15,7 @@ export default function CourierFetchCallResponseHandlers(Private, Promise) {
return ABORTED;
}

var resp = responses[i];
let resp = responses[i];

if (resp.timed_out) {
notify.warning(new SearchTimeout());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import CourierFetchReqStatusProvider from 'ui/courier/fetch/_req_status';
import ReqStatusProvider from './req_status';

export default function CourierFetchContinueIncompleteRequests(Private) {
var INCOMPLETE = Private(CourierFetchReqStatusProvider).INCOMPLETE;
const INCOMPLETE = Private(ReqStatusProvider).INCOMPLETE;

function continueIncompleteRequests(strategy, requests, responses, fetchWithStrategy) {
var incomplete = [];
const incomplete = [];

responses.forEach(function (resp, i) {
if (resp === INCOMPLETE) {
Expand Down
22 changes: 12 additions & 10 deletions src/ui/public/courier/fetch/fetch.js
Original file line number Diff line number Diff line change
@@ -1,26 +1,28 @@
import _ from 'lodash';
import CourierRequestQueueProvider from 'ui/courier/_request_queue';
import CourierFetchFetchTheseProvider from 'ui/courier/fetch/_fetch_these';
import CourierFetchCallResponseHandlersProvider from 'ui/courier/fetch/_call_response_handlers';
import CourierFetchReqStatusProvider from 'ui/courier/fetch/_req_status';

import RequestQueueProvider from '../_request_queue';
import FetchTheseProvider from './fetch_these';
import CallResponseHandlersProvider from './call_response_handlers';
import ReqStatusProvider from './req_status';

export default function fetchService(Private, Promise) {

var requestQueue = Private(CourierRequestQueueProvider);
var fetchThese = Private(CourierFetchFetchTheseProvider);
const requestQueue = Private(RequestQueueProvider);
const fetchThese = Private(FetchTheseProvider);

var callResponseHandlers = Private(CourierFetchCallResponseHandlersProvider);
var INCOMPLETE = Private(CourierFetchReqStatusProvider).INCOMPLETE;
const callResponseHandlers = Private(CallResponseHandlersProvider);
const INCOMPLETE = Private(ReqStatusProvider).INCOMPLETE;

function fetchQueued(strategy) {
var requests = requestQueue.getStartable(strategy);
const requests = requestQueue.getStartable(strategy);
if (!requests.length) return Promise.resolve();
else return fetchThese(requests);
}

this.fetchQueued = fetchQueued;

function fetchASource(source, strategy) {
var defer = Promise.defer();
const defer = Promise.defer();

fetchThese([
source._createRequest(defer)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
import CourierFetchNotifierProvider from 'ui/courier/fetch/_notifier';
import CourierFetchForEachStrategyProvider from 'ui/courier/fetch/_for_each_strategy';
import CourierFetchCallClientProvider from 'ui/courier/fetch/_call_client';
import CourierFetchCallResponseHandlersProvider from 'ui/courier/fetch/_call_response_handlers';
import CourierFetchContinueIncompleteProvider from 'ui/courier/fetch/_continue_incomplete';
import CourierFetchReqStatusProvider from 'ui/courier/fetch/_req_status';
import NotifierProvider from './notifier';
import ForEachStrategyProvider from './for_each_strategy';
import CallClientProvider from './call_client';
import CallResponseHandlersProvider from './call_response_handlers';
import ContinueIncompleteProvider from './continue_incomplete';
import ReqStatusProvider from './req_status';

export default function FetchTheseProvider(Private, Promise) {
var notify = Private(CourierFetchNotifierProvider);
var forEachStrategy = Private(CourierFetchForEachStrategyProvider);
const notify = Private(NotifierProvider);
const forEachStrategy = Private(ForEachStrategyProvider);

// core tasks
var callClient = Private(CourierFetchCallClientProvider);
var callResponseHandlers = Private(CourierFetchCallResponseHandlersProvider);
var continueIncomplete = Private(CourierFetchContinueIncompleteProvider);
const callClient = Private(CallClientProvider);
const callResponseHandlers = Private(CallResponseHandlersProvider);
const continueIncomplete = Private(ContinueIncompleteProvider);

var ABORTED = Private(CourierFetchReqStatusProvider).ABORTED;
var DUPLICATE = Private(CourierFetchReqStatusProvider).DUPLICATE;
var INCOMPLETE = Private(CourierFetchReqStatusProvider).INCOMPLETE;
const ABORTED = Private(ReqStatusProvider).ABORTED;
const DUPLICATE = Private(ReqStatusProvider).DUPLICATE;
const INCOMPLETE = Private(ReqStatusProvider).INCOMPLETE;

function fetchThese(requests) {
return forEachStrategy(requests, function (strategy, reqsForStrategy) {
Expand Down Expand Up @@ -66,7 +66,7 @@ export default function FetchTheseProvider(Private, Promise) {
}

return new Promise(function (resolve) {
var action = req.started ? req.continue : req.start;
const action = req.started ? req.continue : req.start;
resolve(action.call(req));
})
.catch(err => req.handleFailure(err));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import _ from 'lodash';
export default function FetchForEachRequestStrategy(Private, Promise) {

export default function FetchForEachRequestStrategy(Private, Promise) {
function forEachStrategy(requests, block) {
block = Promise.method(block);
var sets = [];
const sets = [];

requests.forEach(function (req) {
var strategy = req.strategy;
var set = _.find(sets, { 0: strategy });
const strategy = req.strategy;
const set = _.find(sets, { 0: strategy });
if (set) set[1].push(req);
else sets.push([strategy, [req]]);
});
Expand Down
9 changes: 9 additions & 0 deletions src/ui/public/courier/fetch/is_request.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import AbstractRequestProvider from './request';

export default function IsRequestProvider(Private) {
const AbstractRequest = Private(AbstractRequestProvider);

return function isRequest(obj) {
return obj instanceof AbstractRequest;
};
};
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import CourierFetchIsRequestProvider from 'ui/courier/fetch/_is_request';
import CourierFetchReqStatusProvider from 'ui/courier/fetch/_req_status';
import IsRequestProvider from './is_request';
import ReqStatusProvider from './req_status';

export default function FetchMergeDuplicateRequests(Private) {
var isRequest = Private(CourierFetchIsRequestProvider);
var DUPLICATE = Private(CourierFetchReqStatusProvider).DUPLICATE;
const isRequest = Private(IsRequestProvider);
const DUPLICATE = Private(ReqStatusProvider).DUPLICATE;

function mergeDuplicateRequests(requests) {
// dedupe requests
var index = {};
const index = {};
return requests.map(function (req) {
if (!isRequest(req)) return req;

var iid = req.source._instanceid;
const iid = req.source._instanceid;
if (!index[iid]) {
// this request is unique so far
index[iid] = req;
Expand Down
11 changes: 6 additions & 5 deletions src/ui/public/courier/fetch/request/__tests__/segmented.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import sinon from 'auto-release-sinon';
import expect from 'expect.js';
import ngMock from 'ngMock';
import CourierFetchRequestSegmentedProvider from 'ui/courier/fetch/request/segmented';
import CourierFetchRequestSearchProvider from 'ui/courier/fetch/request/search';
describe('ui/courier/fetch/request/segmented', () => {

import SegmentedRequestProvider from '../segmented';
import SearchRequestProvider from '../search';

describe('ui/courier/fetch/request/segmented', () => {
let Promise;
let $rootScope;
let SegmentedReq;
Expand All @@ -16,8 +17,8 @@ describe('ui/courier/fetch/request/segmented', () => {
beforeEach(ngMock.inject((Private, $injector) => {
Promise = $injector.get('Promise');
$rootScope = $injector.get('$rootScope');
SegmentedReq = Private(CourierFetchRequestSegmentedProvider);
searchReqStart = sinon.spy(Private(CourierFetchRequestSearchProvider).prototype, 'start');
SegmentedReq = Private(SegmentedRequestProvider);
searchReqStart = sinon.spy(Private(SearchRequestProvider).prototype, 'start');
}));

describe('#start()', () => {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import sinon from 'auto-release-sinon';
import expect from 'expect.js';
import ngMock from 'ngMock';

import StubbedSearchSourceProvider from 'fixtures/stubbed_search_source';
import CourierFetchRequestSegmentedProvider from 'ui/courier/fetch/request/segmented';

import SegmentedRequestProvider from '../segmented';

describe('ui/courier/fetch/request/segmented/_createQueue', () => {

let Promise;
Expand All @@ -16,7 +19,7 @@ describe('ui/courier/fetch/request/segmented/_createQueue', () => {
beforeEach(ngMock.inject((Private, $injector) => {
Promise = $injector.get('Promise');
$rootScope = $injector.get('$rootScope');
SegmentedReq = Private(CourierFetchRequestSegmentedProvider);
SegmentedReq = Private(SegmentedRequestProvider);

MockSource = class {
constructor() {
Expand All @@ -29,7 +32,7 @@ describe('ui/courier/fetch/request/segmented/_createQueue', () => {
const req = new SegmentedReq(new MockSource());
req._queueCreated = null;

var promise = req._createQueue();
const promise = req._createQueue();
expect(req._queueCreated).to.be(false);
await promise;
expect(req._queueCreated).to.be(true);
Expand Down
Loading