-
Notifications
You must be signed in to change notification settings - Fork 2
/
conftest.py
137 lines (116 loc) · 4.6 KB
/
conftest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# SPDX-License-Identifier: AGPL-3.0-only
# Copyright (C) 2022 Sean Anderson <[email protected]>
from contextlib import contextmanager
from datetime import datetime
import os
import logging
import pytest
from testing.postgresql import Postgresql
import trends.importer.logs
import trends.importer.etf2l
import trends.importer.link_matches
from trends.importer.fetch import ETF2LFileFetcher, FileFetcher
from trends.sql import db_connect, db_init
@contextmanager
def caplog_session(request):
request.node.add_report_section = lambda *args: None
logging_plugin = request.config.pluginmanager.getplugin('logging-plugin')
for _ in logging_plugin.pytest_runtest_setup(request.node):
yield pytest.LogCaptureFixture(request.node, _ispytest=True)
@pytest.fixture(scope='session')
def database(request):
postgres_args = Postgresql.DEFAULT_SETTINGS['postgres_args']
postgres_args += " -c full_page_writes=off"
with Postgresql(postgres_args=postgres_args) as database:
# This must happen in a separate connection because we use temporary tables which will alias
# other queries.
with db_connect(database.url()) as c:
db_init(c)
cur = c.cursor()
logfiles = { logid: f"{os.path.dirname(__file__)}/logs/log_{logid}.json"
for logid in (
30099,
2408458,
2408491,
2600722,
2818814,
2844704,
3027588,
3069780,
3124976,
)
}
with caplog_session(request) as caplog:
with caplog.at_level(logging.ERROR):
trends.importer.logs.import_logs(c, FileFetcher(logs=logfiles), False)
if caplog.records:
pytest.fail("Error importing logs")
with db_connect(database.url()) as c:
fetcher = ETF2LFileFetcher(results=f"{os.path.dirname(__file__)}/etf2l/results.json",
xferdir=f"{os.path.dirname(__file__)}/etf2l/")
with caplog_session(request) as caplog:
with caplog.at_level(logging.ERROR):
trends.importer.etf2l.import_etf2l(c, fetcher)
if caplog.records:
pytest.fail("Error importing ETF2L files")
with db_connect(database.url()) as c:
cur = c.cursor()
cur.execute("ANALYZE;")
# A second time to test partitioning log_json
db_init(c)
cur.execute("REFRESH MATERIALIZED VIEW leaderboard_cube;")
cur.execute("REFRESH MATERIALIZED VIEW map_popularity;")
with db_connect(database.url()) as c:
class args:
since = datetime.fromtimestamp(0)
trends.importer.link_matches.link_matches(args, c)
yield database
@pytest.fixture(scope='session')
def connection(database):
with db_connect(database.url()) as c:
yield c
@pytest.fixture(scope='session')
def logs(connection):
cur = connection.cursor()
cur.execute("SELECT logid FROM log LIMIT 1000;")
return [row[0] for row in cur]
@pytest.fixture(scope='session')
def players(connection):
cur = connection.cursor()
cur.execute("SELECT steamid64 FROM player LIMIT 1000;")
return [row[0] for row in cur]
@pytest.fixture(scope='session')
def titles(connection):
cur = connection.cursor()
cur.execute("SELECT title FROM log LIMIT 1000;")
return [row[0] for row in cur]
@pytest.fixture(scope='session')
def maps(connection):
cur = connection.cursor()
cur.execute("SELECT map FROM map LIMIT 1000;")
return [row[0] for row in cur]
@pytest.fixture(scope='session')
def names(connection):
cur = connection.cursor()
cur.execute("SELECT name FROM name WHERE length(name) >= 3 LIMIT 1000;")
return [row[0] for row in cur]
@pytest.fixture(scope='session')
def compids(connection):
cur = connection.cursor()
cur.execute("SELECT league, compid FROM competition LIMIT 1000;")
return cur.fetchall()
@pytest.fixture(scope='session')
def teamids(connection):
cur = connection.cursor()
cur.execute("SELECT league, teamid FROM league_team LIMIT 1000;")
return cur.fetchall()
@pytest.fixture(scope='session')
def comps(connection):
cur = connection.cursor()
cur.execute("SELECT name FROM competition LIMIT 1000;")
return [row[0] for row in cur]
@pytest.fixture(scope='session')
def divids(connection):
cur = connection.cursor()
cur.execute("SELECT divid FROM division LIMIT 1000;")
return [row[0] for row in cur]