Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[rebased to 4.3.1] #488

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions databuilder/extractor/neo4j_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,20 +69,22 @@ def _get_driver(self) -> Any:
trust = neo4j.TRUST_SYSTEM_CA_SIGNED_CERTIFICATES if self.conf.get_bool(Neo4jExtractor.NEO4J_VALIDATE_SSL) \
else neo4j.TRUST_ALL_CERTIFICATES
return GraphDatabase.driver(self.graph_url,
max_connection_life_time=self.conf.get_int(
max_connection_lifetime=self.conf.get_int(
Neo4jExtractor.NEO4J_MAX_CONN_LIFE_TIME_SEC),
auth=(self.conf.get_string(Neo4jExtractor.NEO4J_AUTH_USER),
self.conf.get_string(Neo4jExtractor.NEO4J_AUTH_PW)),
encrypted=self.conf.get_bool(Neo4jExtractor.NEO4J_ENCRYPTED),
trust=trust)
#trust=trust
)

def _execute_query(self, tx: Any) -> Any:
"""
Create an iterator to execute sql.
"""
LOGGER.info('Executing query %s', self.cypher_query)
result = tx.run(self.cypher_query)
return result
#return result
return [record for record in result]

def _get_extract_iter(self) -> Iterator[Any]:
"""
Expand Down
15 changes: 9 additions & 6 deletions databuilder/publisher/neo4j_csv_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
import pandas
from jinja2 import Template
from neo4j import GraphDatabase, Transaction
from neo4j.exceptions import CypherError, TransientError
from neo4j.exceptions import Neo4jError
from pyhocon import ConfigFactory, ConfigTree
from typing import Set, List

from databuilder.publisher.base_publisher import Publisher
from databuilder.publisher.neo4j_preprocessor import NoopRelationPreprocessor
Expand Down Expand Up @@ -137,11 +138,13 @@ def init(self, conf: ConfigTree) -> None:
else neo4j.TRUST_ALL_CERTIFICATES
self._driver = \
GraphDatabase.driver(conf.get_string(NEO4J_END_POINT_KEY),
max_connection_life_time=conf.get_int(NEO4J_MAX_CONN_LIFE_TIME_SEC),
max_connection_lifetime=conf.get_int(NEO4J_MAX_CONN_LIFE_TIME_SEC),
auth=(conf.get_string(NEO4J_USER), conf.get_string(NEO4J_PASSWORD)),
encrypted=conf.get_bool(NEO4J_ENCRYPTED),
trust=trust)
self._transaction_size = conf.get_int(NEO4J_TRANSACTION_SIZE)
#trust=trust
)
self._transaction_size = conf.get_int(NEO4J_TRANSCATION_SIZE)

self._session = self._driver.session()
self._confirm_rel_created = conf.get_bool(NEO4J_RELATIONSHIP_CREATION_CONFIRM)

Expand Down Expand Up @@ -426,7 +429,7 @@ def _execute_statement(self,
try:
LOGGER.debug('Executing statement: %s with params %s', stmt, params)

result = tx.run(str(stmt).encode('utf-8', 'ignore'), parameters=params)
result = tx.run(str(stmt), parameters=params)
if expect_result and not result.single():
raise RuntimeError(f'Failed to executed statement: {stmt}')

Expand Down Expand Up @@ -461,7 +464,7 @@ def _try_create_index(self, label: str) -> None:
with self._driver.session() as session:
try:
session.run(stmt)
except CypherError as e:
except Neo4jError as e:
if 'An equivalent constraint already exists' not in e.__str__():
raise
# Else, swallow the exception, to make this function idempotent.
8 changes: 5 additions & 3 deletions databuilder/task/neo4j_staleness_removal_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,11 @@ def init(self, conf: ConfigTree) -> None:
else neo4j.TRUST_ALL_CERTIFICATES
self._driver = \
GraphDatabase.driver(conf.get_string(NEO4J_END_POINT_KEY),
max_connection_life_time=conf.get_int(NEO4J_MAX_CONN_LIFE_TIME_SEC),
max_connection_lifetime=conf.get_int(NEO4J_MAX_CONN_LIFE_TIME_SEC),
auth=(conf.get_string(NEO4J_USER), conf.get_string(NEO4J_PASSWORD)),
encrypted=conf.get_bool(NEO4J_ENCRYPTED),
trust=trust)
#trust=trust
)

def run(self) -> None:
"""
Expand Down Expand Up @@ -266,7 +267,8 @@ def _execute_cypher_query(self,
start = time.time()
try:
with self._driver.session() as session:
return session.run(statement, **param_dict)
result = session.run(statement, **param_dict)
return [record for record in result]

finally:
LOGGER.debug('Cypher query execution elapsed for %i seconds', time.time() - start)
5 changes: 2 additions & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,10 @@ unicodecsv==0.14.1,<1.0
httplib2>=0.18.0
unidecode
Jinja2>=2.10.0,<2.12
pandas>=0.21.0,<1.2.0
pandas>=0.21.0

requests==2.23.0,<3.0
responses==0.10.6

pyatlasclient==1.1.2

amundsen-rds>=0.0.4
neo4j==4.1.1
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@


requirements = [
"neo4j-driver>=1.7.2,<4.0",
"neo4j==4.1.1",
"pytz>=2018.4",
"statsd>=3.2.1",
"retrying>=1.3.3",
Expand Down