Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend PortfolioOptimization to support integer variables #57

Merged
merged 13 commits into from
Jun 18, 2021
2 changes: 2 additions & 0 deletions .pylintdict
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ paulis
pca
pdf
piecewise
portfoliooptimization
pre
precomputed
qgan
Expand All @@ -96,6 +97,7 @@ representable
rescaling
rescalings
rutkowski
selectable
scholes
scipy
silvio
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@
# that they have been altered from the originals.

"""An application class for a portfolio optimization problem."""
from typing import List, Union
from typing import List, Tuple, Union, Optional

import numpy as np
from docplex.mp.advmodel import AdvModel

from qiskit_optimization.algorithms import OptimizationResult
from qiskit_optimization.applications import OptimizationApplication
from qiskit_optimization.problems import QuadraticProgram
from qiskit_finance.exceptions import QiskitFinanceError


class PortfolioOptimization(OptimizationApplication):
Expand All @@ -35,18 +36,24 @@ def __init__(
covariances: np.ndarray,
risk_factor: float,
budget: int,
bounds: Optional[List[Tuple[int, int]]] = None,
) -> None:
"""
Args:
expected_returns: The expected returns for the assets.
covariances: The covariances between the assets.
risk_factor: The risk appetite of the decision maker.
budget: The budget, i.e. the number of assets to be selected.
bounds: The list of tuples for the lower bounds and the upper bounds of each variable.
e.g. [(lower bound1, upper bound1), (lower bound2, upper bound2), ...].
Default is None which means all the variables are binary variables.
"""
self._expected_returns = expected_returns
self._covariances = covariances
self._risk_factor = risk_factor
self._budget = budget
self._bounds = bounds
self._check_compatibility(bounds)

def to_quadratic_program(self) -> QuadraticProgram:
"""Convert a portfolio optimization problem instance into a
Expand All @@ -56,9 +63,16 @@ def to_quadratic_program(self) -> QuadraticProgram:
The :class:`~qiskit_optimization.problems.QuadraticProgram` created
from the portfolio optimization problem instance.
"""
self._check_compatibility(self._bounds)
num_assets = len(self._expected_returns)
mdl = AdvModel(name="Portfolio optimization")
x = [mdl.binary_var(name="x_{0}".format(i)) for i in range(num_assets)]
if self.bounds:
x = [
mdl.integer_var(lb=self.bounds[i][0], ub=self.bounds[i][1], name=f"x_{i}")
for i in range(num_assets)
]
else:
x = [mdl.binary_var(name=f"x_{i}") for i in range(num_assets)]
quad = mdl.quad_matrix_sum(self._covariances, x)
linear = np.dot(self._expected_returns, x)
mdl.minimize(self._risk_factor * quad - linear)
Expand Down Expand Up @@ -103,6 +117,35 @@ def interpret(self, result: Union[OptimizationResult, np.ndarray]) -> List[int]:
x = self._result_to_x(result)
return [i for i, x_i in enumerate(x) if x_i]

def _check_compatibility(self, bounds) -> None:
"""Check the compatibility of given variables"""
if len(self._expected_returns) != len(self._covariances) or not all(
len(self._expected_returns) == len(row) for row in self._covariances
):
raise QiskitFinanceError(
"The sizes of expected_returns and covariances do not match. ",
f"expected_returns: {self._expected_returns}, covariances: {self._covariances}.",
)
if bounds is not None:
if (
not isinstance(bounds, list)
or not all(isinstance(lb_, int) for lb_, _ in bounds)
or not all(isinstance(ub_, int) for _, ub_ in bounds)
):
raise QiskitFinanceError(
f"The bounds must be a list of tuples of integers. {bounds}",
)
if any(ub_ < lb_ for lb_, ub_ in bounds):
raise QiskitFinanceError(
"The upper bound of each variable, in the list of bounds, must be larger ",
f"than its lower bound. {bounds}",
)
if len(bounds) != len(self._expected_returns):
raise QiskitFinanceError(
f"The lengths of the bounds, {len(self._bounds)}, do not match to ",
f"the number of types of assets, {len(self._expected_returns)}.",
)

@property
def expected_returns(self) -> np.ndarray:
"""Getter of expected_returns
Expand Down Expand Up @@ -174,3 +217,22 @@ def budget(self, budget: int) -> None:
budget: The budget, i.e. the number of assets to be selected.
"""
self._budget = budget

@property
def bounds(self) -> List[Tuple[int, int]]:
"""Getter of the lower bounds and upper bounds of each selectable assets.

Returns:
The lower bounds and upper bounds of each assets selectable
"""
return self._bounds

@bounds.setter
def bounds(self, bounds: List[Tuple[int, int]]) -> None:
"""Setter of the lower bounds and upper bounds of each selectable assets.

Args:
bounds: The lower bounds and upper bounds of each assets selectable
"""
self._check_compatibility(bounds) # check compatibility before setting bounds
self._bounds = bounds
6 changes: 6 additions & 0 deletions releasenotes/notes/portfolio_intger-f88d116e5b40c6ad.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
features:
- |
:class`qiskit_finance.applications.optimization.PortfolioOptimization` now supports
integer variables. So, users can decide how many asset they buy instead of just tow choices,
woodsp-ibm marked this conversation as resolved.
Show resolved Hide resolved
buy or not.
69 changes: 53 additions & 16 deletions test/applications/test_portfolio_optimization.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,18 @@
import logging
import unittest
from test import QiskitFinanceTestCase

from ddt import ddt, data, unpack
import numpy as np

from qiskit.utils import algorithm_globals
from qiskit_optimization.problems import QuadraticProgram, VarType
from qiskit_optimization.problems import QuadraticProgram
from qiskit_finance.applications.optimization import PortfolioOptimization
from qiskit_finance.exceptions import QiskitFinanceError

logger = logging.getLogger(__name__)


@ddt
class TestPortfolioDiversification(QiskitFinanceTestCase):
"""Tests Portfolio Diversification application class."""

Expand All @@ -40,16 +43,19 @@ def setUp(self):
[-9.98612132e-05, 4.44816208e-05, -1.23887382e-04, 1.97892585e-04],
]
self.risk_factor = 0.5
self.budget = self.num_assets // 2
self.budget = 2
self.bounds = [(0, 2), (0, 3), (0, 4), (1, 5)]

def assertEqualQuadraticProgram(self, actual, expected):
"""Compare two instances for quadratic program"""
# Test name
self.assertEqual(actual.name, expected.name)
# Test variables
self.assertEqual(actual.get_num_vars(), expected.get_num_vars())
for var in actual.variables:
self.assertEqual(var.vartype, VarType.BINARY)
for var1, var2 in zip(actual.variables, actual.variables):
self.assertEqual(var1.vartype, var2.vartype)
self.assertEqual(var1.lowerbound, var2.lowerbound)
self.assertEqual(var1.upperbound, var2.upperbound)
# Test objective
self.assertEqual(actual.objective.sense, expected.objective.sense)
self.assertEqual(actual.objective.constant, expected.objective.constant)
Expand All @@ -64,25 +70,16 @@ def assertEqualQuadraticProgram(self, actual, expected):
self.assertEqual(act_lin.rhs, exp_lin.rhs)
self.assertEqual(act_lin.linear.to_dict(), exp_lin.linear.to_dict())

# """ Compares the dags after unrolling to basis """
# circuit_dag = circuit_to_dag(circuit)
# expected_dag = circuit_to_dag(expected)

# circuit_result = Unroller(basis).run(circuit_dag)
# expected_result = Unroller(basis).run(expected_dag)

# self.assertEqual(circuit_result, expected_result)

def test_to_quadratic_program(self):
"""Test to_quadratic_program"""
# When bounds is None
portfolio_optimization = PortfolioOptimization(
self.expected_returns, self.covariances, self.risk_factor, self.budget
)
actual_op = portfolio_optimization.to_quadratic_program()

expected_op = QuadraticProgram(name="Portfolio optimization")
for i in range(self.num_assets):
expected_op.binary_var(name="x_{0}".format(i))
expected_op.binary_var(name=f"x_{i}")
quadratic = {
(i, j): self.risk_factor * self.covariances[i][j]
for i in range(self.num_assets)
Expand All @@ -93,6 +90,38 @@ def test_to_quadratic_program(self):
linear = {i: 1 for i in range(self.num_assets)}
expected_op.linear_constraint(linear=linear, sense="==", rhs=self.budget)
self.assertEqualQuadraticProgram(actual_op, expected_op)
# When bounds is provided
portfolio_optimization = PortfolioOptimization(
self.expected_returns, self.covariances, self.risk_factor, self.budget, self.bounds
)
actual_op = portfolio_optimization.to_quadratic_program()
expected_op = QuadraticProgram(name="Portfolio optimization")
for i in range(self.num_assets):
expected_op.integer_var(
lowerbound=self.bounds[i][0], upperbound=self.bounds[i][1], name=f"x_{i}"
)
quadratic = {
(i, j): self.risk_factor * self.covariances[i][j]
for i in range(self.num_assets)
for j in range(self.num_assets)
}
linear = {i: -self.expected_returns[i] for i in range(self.num_assets)}
expected_op.minimize(quadratic=quadratic, linear=linear)
linear = {i: 1 for i in range(self.num_assets)}
expected_op.linear_constraint(linear=linear, sense="==", rhs=self.budget)
self.assertEqualQuadraticProgram(actual_op, expected_op)

@data(
[[1], [[1], [1]], 0.5, 2, None],
[[1], [[1, 1]], 0.5, 2, None],
[[1, 2], [[1, 2], [3, 4]], 0.5, 2, [(0, 2), (3, 1)]],
[[1, 2], [[1, 2], [3, 4]], 0.5, 2, [(0, 2), (0, 2), (0, 2)]],
)
@unpack
def test_is_compatibility(self, expected_returns, covariances, risk_factor, budget, bounds):
"""Test error cases in _is_compatibility"""
with self.assertRaises(QiskitFinanceError):
_ = PortfolioOptimization(expected_returns, covariances, risk_factor, budget, bounds)

def test_interpret(self):
"""test interpret"""
Expand Down Expand Up @@ -136,6 +165,14 @@ def test_budget(self):
portfolio_optimization.budget = 3
self.assertEqual(portfolio_optimization.budget, 3)

def test_bounds(self):
"""test bounds"""
portfolio_optimization = PortfolioOptimization(
self.expected_returns, self.covariances, self.risk_factor, self.budget, self.bounds
)
portfolio_optimization.bounds = [(0, 4), (0, 4), (0, 4), (1, 4)]
self.assertEqual(portfolio_optimization.bounds, [(0, 4), (0, 4), (0, 4), (1, 4)])


if __name__ == "__main__":
unittest.main()