Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce ConnectionPool with master discovery #207

Merged
merged 3 commits into from
Apr 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/workflows/testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ jobs:
- '2.8'
- '2.x-latest'
python:
- '2.7'
Totktonada marked this conversation as resolved.
Show resolved Hide resolved
- '3.5'
- '3.6'
- '3.7'
Expand Down Expand Up @@ -116,7 +115,6 @@ jobs:
- '1.10'
- '2.8'
python:
- '2.7'
- '3.10'

steps:
Expand Down
39 changes: 39 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,47 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Reusable testing workflow for integration with tarantool artifacts
(PR #192).
- Connection pool with master discovery (PR #211, #196).

ConnectionPool is supported only for Python 3.7 or newer.
Authenticated user must be able to call `box.info` on instances.

ConnectionPool updates information about each server state (RO/RW)
on initial connect and then asynchronously in separate threads.
Application retries must be written considering the asynchronous nature
of cluster state refresh. User does not need to use any synchronization
mechanisms in requests, it's all handled with ConnectionPool methods.

ConnectionPool API is the same as a plain Connection API.
On each request, a connection is chosen to execute this request.
A connection is chosen based on a request mode:
* Mode.ANY chooses any instance.
* Mode.RW chooses an RW instance.
* Mode.RO chooses an RO instance.
* Mode.PREFER_RW chooses an RW instance, if possible, RO instance
otherwise.
* Mode.PREFER_RO chooses an RO instance, if possible, RW instance
otherwise.
All requests that are guaranteed to write (insert, replace, delete,
upsert, update) use RW mode by default. select uses ANY by default. You
can set the mode explicitly. call, eval, execute and ping requests
require to set the mode explicitly.

Example:
```python
pool = tarantool.ConnectionPool(
addrs=[
{'host': '108.177.16.0', 'port': 3301},
{'host': '108.177.16.0', 'port': 3302},
],
user='test',
password='test',)

pool.call('some_write_procedure', arg, mode=tarantool.Mode.RW)
```

### Changed
- **Breaking**: drop Python 2 support (PR #207).
- **Breaking**: change binary types encode/decode for Python 3
to support working with varbinary (PR #211, #105).
With Python 2 the behavior of the connector remains the same.
Expand Down
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,6 @@ def find_version(*file_paths):
command_options=command_options,
install_requires=[
'msgpack>=0.4.0',
]
],
python_requires='>=3',
)
7 changes: 7 additions & 0 deletions tarantool/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# -*- coding: utf-8 -*-
# pylint: disable=C0301,W0105,W0401,W0614

import sys

from tarantool.connection import Connection
from tarantool.mesh_connection import MeshConnection
from tarantool.const import (
Expand Down Expand Up @@ -76,3 +78,8 @@ def connectmesh(addrs=({'host': 'localhost', 'port': 3301},), user=None,
__all__ = ['connect', 'Connection', 'connectmesh', 'MeshConnection', 'Schema',
'Error', 'DatabaseError', 'NetworkError', 'NetworkWarning',
'SchemaError', 'dbapi']

# ConnectionPool is supported only for Python 3.7 or newer.
DifferentialOrange marked this conversation as resolved.
Show resolved Hide resolved
if sys.version_info.major >= 3 and sys.version_info.minor >= 7:
from tarantool.connection_pool import ConnectionPool, Mode
__all__.extend(['ConnectionPool', 'Mode'])
87 changes: 86 additions & 1 deletion tarantool/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import time
import errno
import socket
import abc

import ctypes
import ctypes.util
Expand Down Expand Up @@ -76,8 +77,92 @@
ENCODING_DEFAULT,
)

# Based on https://realpython.com/python-interface/
class ConnectionInterface(metaclass=abc.ABCMeta):
@classmethod
def __subclasshook__(cls, subclass):
return (hasattr(subclass, 'close') and
callable(subclass.close) and
hasattr(subclass, 'is_closed') and
callable(subclass.is_closed) and
hasattr(subclass, 'connect') and
callable(subclass.connect) and
hasattr(subclass, 'call') and
callable(subclass.call) and
hasattr(subclass, 'eval') and
callable(subclass.eval) and
hasattr(subclass, 'replace') and
callable(subclass.replace) and
hasattr(subclass, 'insert') and
callable(subclass.insert) and
hasattr(subclass, 'delete') and
callable(subclass.delete) and
hasattr(subclass, 'upsert') and
callable(subclass.upsert) and
hasattr(subclass, 'update') and
callable(subclass.update) and
hasattr(subclass, 'ping') and
callable(subclass.ping) and
hasattr(subclass, 'select') and
callable(subclass.select) and
hasattr(subclass, 'execute') and
callable(subclass.execute) or
NotImplemented)

@abc.abstractmethod
def close(self):
raise NotImplementedError

@abc.abstractmethod
def is_closed(self):
raise NotImplementedError

@abc.abstractmethod
def connect(self):
raise NotImplementedError

@abc.abstractmethod
def call(self, func_name, *args, **kwargs):
raise NotImplementedError

@abc.abstractmethod
def eval(self, expr, *args, **kwargs):
raise NotImplementedError

@abc.abstractmethod
def replace(self, space_name, values):
raise NotImplementedError

@abc.abstractmethod
def insert(self, space_name, values):
raise NotImplementedError

@abc.abstractmethod
def delete(self, space_name, key, **kwargs):
raise NotImplementedError

@abc.abstractmethod
def upsert(self, space_name, tuple_value, op_list, **kwargs):
raise NotImplementedError

@abc.abstractmethod
def update(self, space_name, key, op_list, **kwargs):
raise NotImplementedError

@abc.abstractmethod
def ping(self, notime):
raise NotImplementedError

@abc.abstractmethod
def select(self, space_name, key, **kwargs):
raise NotImplementedError

@abc.abstractmethod
def execute(self, query, params, **kwargs):
raise NotImplementedError


class Connection(object):
class Connection(ConnectionInterface):
'''
Represents connection to the Tarantool server.

Expand Down
Loading