Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resampling MVP #1495

Merged
merged 34 commits into from
May 30, 2024
Merged
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
b256370
Enhancement 1010: Resampling MVP
alexowens90 Apr 3, 2024
2e67393
Revert to C++17
alexowens90 May 9, 2024
e752b65
Comment changes
alexowens90 May 10, 2024
03df99c
Revert change to lmdb_version_store_tiny_segment
alexowens90 May 10, 2024
6c3739c
Remove check that input has initial expected get calls in split_by_ro…
alexowens90 May 10, 2024
519e56c
Use Bucket class in aggregation as well
alexowens90 May 10, 2024
37d9810
Remove Pandas date_range timing/logging, and modified some formatting
alexowens90 May 10, 2024
8ac1c3b
Move all sorted aggregation stuff to own files
alexowens90 May 10, 2024
cac141e
Renaming refactor
alexowens90 May 13, 2024
28404a8
Renaming refactor
alexowens90 May 13, 2024
6464068
Remove unused function
alexowens90 May 13, 2024
9e79d0f
Remove summing timestamps from supported aggregations
alexowens90 May 13, 2024
76e0baf
Started refactoring aggregation
alexowens90 May 13, 2024
b4e6aa7
Added push_to_aggregator method
alexowens90 May 13, 2024
39ab291
USe push_to_aggregator in the other relevant place
alexowens90 May 14, 2024
91080f6
Fixed test_resampling_unsupported_aggregation_type_combos
alexowens90 May 14, 2024
d2369eb
Factor out finalize_aggregator
alexowens90 May 14, 2024
32547a2
Presize output index column in blocks, and trim unused blocks at the end
alexowens90 May 14, 2024
868ffa0
USe constexpr where possible
alexowens90 May 14, 2024
108867a
Reinstate all tests, reorder source files
alexowens90 May 14, 2024
91e02f4
Comment changes
alexowens90 May 14, 2024
de3bd8e
Use ColumnDataIterator in copy_frame_data_to_buffer
alexowens90 May 14, 2024
7f1d178
Revert accidentally committed change to task scheduler
alexowens90 May 14, 2024
e04b124
Comment updates
alexowens90 May 14, 2024
bb3639c
Move profile_resample.py out of tests directory
alexowens90 May 14, 2024
d9c0506
Resample docstring
alexowens90 May 14, 2024
1d5ab87
Fix mac build?
alexowens90 May 14, 2024
13f598d
Fix tests
alexowens90 May 15, 2024
b7c7a1d
Make resamply.py in ASV benchmarks dir
alexowens90 May 15, 2024
05b92ca
Dummy commit
alexowens90 May 16, 2024
4ecf145
Resampling ASV benchmarks
alexowens90 May 16, 2024
0264dc4
Update benchmarks.json file too
alexowens90 May 16, 2024
cf6772f
Remove ASV features added in 0.6.0
alexowens90 May 17, 2024
c2d994c
Address review comments
alexowens90 May 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Update benchmarks.json file too
alexowens90 committed May 16, 2024
commit 0264dc40514b1a59bc9477421774815c176210a5
88 changes: 88 additions & 0 deletions python/.asv/results/benchmarks.json
Original file line number Diff line number Diff line change
@@ -1244,6 +1244,94 @@
"version": "ed1d1ccb6458095a627788bfa2b53afa310ca8c8118a6405c91204724c865d6c",
"warmup_time": -1
},
"resample.Resample.peakmem_resample": {
"code": "class Resample:\n def peakmem_resample(self, num_rows, downsampling_factor, col_type, aggregation):\n if col_type == \"datetime\" and aggregation == \"sum\" or col_type == \"str\" and aggregation in [\"sum\", \"mean\", \"min\", \"max\"]:\n raise SkipNotImplemented(f\"{aggregation} not supported on columns of type {col_type}\")\n else:\n self.lib.read(col_type, date_range=self.date_range, query_builder=self.query_builder)\n\n def setup(self, num_rows, downsampling_factor, col_type, aggregation):\n self.ac = Arctic(self.CONNECTION_STRING)\n self.lib = self.ac[self.LIB_NAME]\n self.date_range = (pd.Timestamp(0), pd.Timestamp(num_rows, unit=\"us\"))\n self.query_builder = QueryBuilder().resample(f\"{downsampling_factor}us\").agg({\"col\": aggregation})\n\n def setup_cache(self):\n ac = Arctic(self.CONNECTION_STRING)\n ac.delete_library(self.LIB_NAME)\n lib = ac.create_library(self.LIB_NAME)\n rng = np.random.default_rng()\n col_types = self.params[2]\n rows = max(self.params[0])\n for col_type in col_types:\n if col_type == \"str\":\n num_unique_strings = 100\n unique_strings = random_strings_of_length(num_unique_strings, 10, True)\n sym = col_type\n num_segments = rows // self.ROWS_PER_SEGMENT\n for idx in range(num_segments):\n index = pd.date_range(pd.Timestamp(idx * self.ROWS_PER_SEGMENT, unit=\"us\"), freq=\"us\", periods=self.ROWS_PER_SEGMENT)\n if col_type == \"int\":\n col_data = rng.integers(0, 100_000, self.ROWS_PER_SEGMENT)\n elif col_type == \"bool\":\n col_data = rng.integers(0, 2, self.ROWS_PER_SEGMENT)\n col_data = col_data.astype(bool)\n elif col_type == \"float\":\n col_data = 100_000 * rng.random(self.ROWS_PER_SEGMENT)\n elif col_type == \"datetime\":\n col_data = rng.integers(0, 100_000, self.ROWS_PER_SEGMENT)\n col_data = col_data.astype(\"datetime64[s]\")\n elif col_type == \"str\":\n col_data = np.random.choice(unique_strings, self.ROWS_PER_SEGMENT)\n df = pd.DataFrame({\"col\": col_data}, index=index)\n lib.append(sym, df)",
"name": "resample.Resample.peakmem_resample",
"param_names": [
"num_rows",
"downsampling_factor",
"col_type",
"aggregation"
],
"params": [
[
"1000000",
"10000000"
],
[
"10",
"100",
"100000"
],
[
"'bool'",
"'int'",
"'float'",
"'datetime'",
"'str'"
],
[
"'sum'",
"'mean'",
"'min'",
"'max'",
"'first'",
"'last'",
"'count'"
]
],
"setup_cache_key": "resample:38",
"type": "peakmemory",
"unit": "bytes",
"version": "e64300ebb5bd625e1a0f3774aadd035e5738b41295ec2a8ce082d2e9add9b580"
},
"resample.Resample.time_resample": {
"code": "class Resample:\n def time_resample(self, num_rows, downsampling_factor, col_type, aggregation):\n if col_type == \"datetime\" and aggregation == \"sum\" or col_type == \"str\" and aggregation in [\"sum\", \"mean\", \"min\", \"max\"]:\n raise SkipNotImplemented(f\"{aggregation} not supported on columns of type {col_type}\")\n else:\n self.lib.read(col_type, date_range=self.date_range, query_builder=self.query_builder)\n\n def setup(self, num_rows, downsampling_factor, col_type, aggregation):\n self.ac = Arctic(self.CONNECTION_STRING)\n self.lib = self.ac[self.LIB_NAME]\n self.date_range = (pd.Timestamp(0), pd.Timestamp(num_rows, unit=\"us\"))\n self.query_builder = QueryBuilder().resample(f\"{downsampling_factor}us\").agg({\"col\": aggregation})\n\n def setup_cache(self):\n ac = Arctic(self.CONNECTION_STRING)\n ac.delete_library(self.LIB_NAME)\n lib = ac.create_library(self.LIB_NAME)\n rng = np.random.default_rng()\n col_types = self.params[2]\n rows = max(self.params[0])\n for col_type in col_types:\n if col_type == \"str\":\n num_unique_strings = 100\n unique_strings = random_strings_of_length(num_unique_strings, 10, True)\n sym = col_type\n num_segments = rows // self.ROWS_PER_SEGMENT\n for idx in range(num_segments):\n index = pd.date_range(pd.Timestamp(idx * self.ROWS_PER_SEGMENT, unit=\"us\"), freq=\"us\", periods=self.ROWS_PER_SEGMENT)\n if col_type == \"int\":\n col_data = rng.integers(0, 100_000, self.ROWS_PER_SEGMENT)\n elif col_type == \"bool\":\n col_data = rng.integers(0, 2, self.ROWS_PER_SEGMENT)\n col_data = col_data.astype(bool)\n elif col_type == \"float\":\n col_data = 100_000 * rng.random(self.ROWS_PER_SEGMENT)\n elif col_type == \"datetime\":\n col_data = rng.integers(0, 100_000, self.ROWS_PER_SEGMENT)\n col_data = col_data.astype(\"datetime64[s]\")\n elif col_type == \"str\":\n col_data = np.random.choice(unique_strings, self.ROWS_PER_SEGMENT)\n df = pd.DataFrame({\"col\": col_data}, index=index)\n lib.append(sym, df)",
"min_run_count": 2,
"name": "resample.Resample.time_resample",
"number": 5,
"param_names": [
"num_rows",
"downsampling_factor",
"col_type",
"aggregation"
],
"params": [
[
"1000000",
"10000000"
],
[
"10",
"100",
"100000"
],
[
"'bool'",
"'int'",
"'float'",
"'datetime'",
"'str'"
],
[
"'sum'",
"'mean'",
"'min'",
"'max'",
"'first'",
"'last'",
"'count'"
]
],
"repeat": 0,
"rounds": 2,
"sample_time": 0.01,
"setup_cache_key": "resample:38",
"type": "time",
"unit": "seconds",
"version": "2d10a27f3668632f382e90783829b4bb08cabb656c02754c00d5953ee42f3794",
"warmup_time": -1
},
"version": 2,
"version_chain.IterateVersionChain.time_list_undeleted_versions": {
"code": "class IterateVersionChain:\n def time_list_undeleted_versions(self, num_versions, caching, deleted):\n self.lib.list_versions(symbol=self.symbol(num_versions, deleted))\n\n def setup(self, num_versions, caching, deleted):\n # Disable warnings for version not found\n set_log_level(\"ERROR\")\n \n if caching==\"never\":\n adb._ext.set_config_int(\"VersionMap.ReloadInterval\", 0)\n if caching==\"forever\":\n adb._ext.set_config_int(\"VersionMap.ReloadInterval\", sys.maxsize)\n if caching==\"default\":\n # Leave the default reload interval\n pass\n \n self.ac = Arctic(IterateVersionChain.CONNECTION_STRING)\n self.lib = self.ac[IterateVersionChain.LIB_NAME]\n \n if caching != \"never\":\n # Pre-load the cache\n self.load_all(self.symbol(num_versions, deleted))\n\n def setup_cache(self):\n self.ac = Arctic(IterateVersionChain.CONNECTION_STRING)\n num_versions_list, caching_list, deleted_list = IterateVersionChain.params\n \n self.ac.delete_library(IterateVersionChain.LIB_NAME)\n lib = self.ac.create_library(IterateVersionChain.LIB_NAME)\n \n small_df = generate_random_floats_dataframe(2, 2)\n \n for num_versions in num_versions_list:\n for deleted in deleted_list:\n symbol = self.symbol(num_versions, deleted)\n for i in range(num_versions):\n lib.write(symbol, small_df)\n if (i == math.floor(deleted * num_versions)):\n lib.delete(symbol)\n \n del self.ac",

Unchanged files with check annotations Beta