-
Notifications
You must be signed in to change notification settings - Fork 0
/
HOBOlink_parse.py
921 lines (783 loc) · 42.9 KB
/
HOBOlink_parse.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
#!/usr/bin/python
# CW3E Field Team
# 2024/04
# Adolfo Lopez Miranda
# import modules
import sys, requests, json, urllib3, pandas as pd, os, csv, numpy as np, pytz
from datetime import datetime, timedelta, timezone
from collections import namedtuple
from pathlib import Path
from io import StringIO
# disable warnings for Insecure Request Warning
urllib3.disable_warnings() # warnings occur when obtaining a token
# Functions to be used when pulling data from HOBOlink
# function to obtain a new OAuth 2.0 token from the authentication server
def get_new_token(auth_server_url, client_id, client_secret):
token_req_payload = {'grant_type': 'client_credentials'}
token_response = requests.post(auth_server_url,
data=token_req_payload,
verify=False,
allow_redirects=False,
auth=(client_id, client_secret)
)
if token_response.status_code !=200:
print("Failed to obtain token from the OAuth 2.0 server")
sys.exit(1)
tokens = json.loads(token_response.text)
return tokens['access_token']
def get_num_lines(fname):
with open(fname,"r") as f:
for i, _ in enumerate(f):
pass
return i + 1
import pandas as pd
from datetime import datetime, timedelta
def csv_timestamp(filename, logging_interval_minutes):
"""
Reads the last non-empty timestamp from a specified column in a CSV file, adjusts the time by a given interval,
and returns the adjusted time in both datetime and formatted string suitable for URL parameters.
Parameters:
- filename (str): Path to the CSV file.
- logging_interval_minutes (int): Number of minutes to add to the last recorded timestamp.
Returns:
- A tuple containing the formatted timestamp string for URL use and the datetime object representing the adjusted time.
"""
# Configure Pandas to read only the first column in chunks
chunk_iterator = pd.read_csv(filename, header=None, skip_blank_lines=True, iterator=True, chunksize=1000, usecols=[0])
date_str = None
for chunk in chunk_iterator:
last_row = chunk.dropna().tail(1) # Use tail to get the last non-empty row
if not last_row.empty:
date_str = last_row.iloc[0, 0] # Assuming timestamp is in the first column
if date_str is None:
raise ValueError("No valid timestamps found in the CSV.")
# Convert string to datetime object and add logging interval
dt_start = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S%z') + timedelta(minutes=logging_interval_minutes)
# Prepare timestamp for URL encoding
start_timestamp = dt_start.strftime("&start_date_time=%Y-%m-%d+%H:%M:%S").replace(':', '%3A')
return start_timestamp, dt_start
def convert_time(time_string, target_format=None):
"""
Convert a given UTC time string to a format suitable for use in scripts or URLs.
Parameters:
- time_string (str): Time in UTC to be converted, formatted as 'YYYY-MM-DD HH:MM:SSZ'.
- target_format (str, optional): Target format for the conversion. Use 'start' for start time format,
'end' for end time format, or leave as None for no additional formatting.
Returns:
- A tuple of (datetime object, formatted time string). If target_format is None, the second element is the original time string.
"""
utc_format = '%Y-%m-%d %H:%M:%S%z' # Original date and time format with UTC timezone
dt_time = datetime.strptime(time_string, utc_format)
# Base format for URL encoding
base_format = '&{}_date_time=%Y-%m-%d+%H:%M:%S'.format(target_format.lower()) if target_format else None
if base_format:
# Encode the datetime object into the specified URL format
timestamp = dt_time.strftime(base_format).replace(':', '%3A')
else:
# No conversion required, use original time string
timestamp = time_string
return dt_time, timestamp
def start_time_offset(time_string, logging_int, target_format=None):
"""
function to convert start_time stored in metadata csv to be used in scripts
time must be provided in UTC, example: 2024-03-01 00:00:00Z
will also apply offset using logging interval to get the next available value
"""
utc_format = '%Y-%m-%d %H:%M:%S%z' # Original date and time format with UTC timezone
dt_time = datetime.strptime(time_string, utc_format) + timedelta(minutes=logging_int)
# Base format for URL encoding
base_format = '&{}_date_time=%Y-%m-%d+%H:%M:%S'.format(target_format.lower()) if target_format else None
if base_format:
# Encode the datetime object into the specified URL format
timestamp = dt_time.strftime(base_format).replace(':', '%3A')
else:
# No conversion required, use original time string
timestamp = time_string
return dt_time, timestamp
# function to parse the data from the HOBOlink API
def parse_stream(hobolink_data, site_name, cdec, base_path=None, append_to_single_file=False):
"""
Parses JSON data that was pulled and found with the dictionary key "observation_list"
JSON data is passed into the DataFrame into daily CSV files within a specified or default directory structure.
Parameters:
- hobolink_data: data returned by the HOBOLink API call
- site_name: str, the name of the site, used in the directory structure and file naming. Typically a 3 character ID
- base_path: str or None, the base path where files will be saved. If None, uses a default directory structure.
"""
# pass JSON data into a dateframe
df = pd.DataFrame.from_dict(hobolink_data["observation_list"])
# Parse data for stream gage
# Water Level
water_lvl = df.loc[df['sensor_measurement_type'] == 'Water Level']
# Check if the resulting DataFrame is empty
if water_lvl.empty:
# If no water level data is found, set default values
water_lvl_si = np.nan
water_lvl_us = np.nan
timestamp = np.nan
else:
water_lvl_si = water_lvl['si_value'].iloc[:].round(2).reset_index(drop=True)
water_lvl_us = water_lvl['us_value'].iloc[:].round(2).reset_index(drop=True)
# Date timestamps for data - each 'sensor_measurement_type' provides a 'timestamp' but only uses one value per timestamp so we drop duplicates
timestamp = water_lvl['timestamp'].reset_index(drop=True)
# Barometric Pressure
bar_pressure = df.loc[df['sensor_measurement_type'] == 'Barometric Pressure']
# Check if the resulting DataFrame is empty
if bar_pressure.empty:
# If no barometric pressure data is found, set default values
bar_pressure_si = np.nan
bar_pressure_us = np.nan
else:
bar_pressure_si = bar_pressure['si_value'].round(2).reset_index(drop=True)
bar_pressure_us = bar_pressure['us_value'].round(2).reset_index(drop=True)
# Difference Pressure
diff_pressure = df.loc[df['sensor_measurement_type'] == 'Diff Pressure']
# Check if the resulting DataFrame is empty
if diff_pressure.empty:
# If no diff pressure data is found, set default values
diff_pressure_si = np.nan
diff_pressure_us = np.nan
else:
diff_pressure_si = diff_pressure['si_value'].round(2).reset_index(drop=True)
diff_pressure_us = diff_pressure['us_value'].round(2).reset_index(drop=True)
# Water Pressure
water_pressure = df.loc[df['sensor_measurement_type'] == 'Water Pressure']
# Check if the resulting DataFrame is empty
if water_pressure.empty:
# If no water pressure data is found, set default values
# If there is no water pressure, do calc with diff pressure and baro pressure
water_pressure_si = (bar_pressure_si + diff_pressure_si).round(2)
water_pressure_us = (bar_pressure_us + diff_pressure_us).round(2)
else:
water_pressure_si = water_pressure['si_value'].round(2).reset_index(drop=True)
water_pressure_us = water_pressure['us_value'].round(2).reset_index(drop=True)
# Date timestamps for data - each 'sensor_measurement_type' provides a 'timestamp' but only uses one value per timestamp so we drop duplicates
#timestamp = water_pressure['timestamp'].reset_index(drop=True)
# Water Temperature
water_temp = df.loc[df['sensor_measurement_type'] == 'Water Temperature']
# Check if the resulting DataFrame is empty
if water_temp.empty:
# If no water temp data is found, set default values
water_temp_si = np.nan
water_temp_us = np.nan
else:
water_temp_si = water_temp['si_value'].iloc[:].round(2).reset_index(drop=True)
water_temp_us = water_temp['us_value'].iloc[:].round(2).reset_index(drop=True)
"""
# snippet to read in water flow sensor measurements if rating curve is set in logger
# commented out to use 100 point rating curve csv files to get discharge
# Attempt to filter for Water Flow measurements
water_flow = df.loc[df['sensor_measurement_type'] == 'Water Flow']
# Check if the resulting DataFrame is empty
if water_flow.empty:
# If no Water Flow data is found, set default values
water_flow_cfs = -9999.99
water_flow_cms = -9999.99
else:
# Proceed with the normal logic for non-empty DataFrame
water_flow_si = water_flow['si_value'].round(2).reset_index(drop=True)
water_flow_cfs = water_flow['us_value'].round(2).reset_index(drop=True)
# Convert L/s to m³/s and round to the second decimal point
water_flow_cms = (water_flow_si * 0.001).round(2)
"""
# Define the path for the long-running file
# if base_path=None the rating curve csv file must be stored in the same directory as where the script is running
dir_path = Path(base_path if base_path else './')
#dir_path.mkdir(parents=True, exist_ok=True) # Ensure directory exists
rating_curve_path = dir_path / f'{site_name}/Rating_Curve/{site_name}.rating_curve_100_points.csv'
# check if there is a rating curve. If not, set discharge to -9999.99
rating_curve_exists = os.path.exists(rating_curve_path)
if rating_curve_exists:
print('Rating curve found.')
# load the rating curve
rating_curve = pd.read_csv(rating_curve_path)
# Apply the rating curve to calculate discharge
water_flow_cfs = water_lvl_us.apply(lambda x: calculate_discharge(x, rating_curve))
water_flow_cfs = water_flow_cfs.round(6)
# Convert from cfs to cms with a vectorized operation
water_flow_cms = np.where(water_flow_cfs == -9999.99,
-9999.99,
water_flow_cfs * 0.0283168).round(6)
else:
print('No rating curve. Setting all discharge to -9999.99')
# Set the discharge to -9999.99
water_flow_cfs = np.full(len(water_lvl_us),-9999.99)
water_flow_cms = np.full(len(water_lvl_us),-9999.99)
# Create a new dataframe with the parsed data
df2 = pd.DataFrame({'timestamp_UTC': timestamp,
'water_temperature_Celsius': water_temp_si,
'water_level_m': water_lvl_si,
'water_pressure_kPa': water_pressure_si,
'water_pressure_psi': water_pressure_us,
'diff_pressure_kPa': diff_pressure_si,
'diff_pressure_psi': diff_pressure_us,
'water_temperature_Fahrenheit': water_temp_us,
'water_level_ft': water_lvl_us,
'barometric_pressure_kPa': bar_pressure_si,
'barometric_pressure_psi': bar_pressure_us,
'discharge_cfs': water_flow_cfs,
'discharge_cms': water_flow_cms
})
# Define new columns - used for QC process
new_columns = {
'level_corrected_ft': [-9999.99] * len(df2),
'level_corrected_m': [-9999.99] * len(df2),
'level_corrected_cm': [-9999.99] * len(df2),
'qc_status': ["Provisional"] * len(df2)
}
# Convert new columns to DataFrame
new_columns_df = pd.DataFrame(new_columns)
# Concatenate existing DataFrame with new columns
df2 = pd.concat([df2, new_columns_df], axis=1)
# Define the desired order of columns
desired_column_order = ['timestamp_UTC',
'water_temperature_Celsius',
'water_level_m',
'water_pressure_kPa',
'water_pressure_psi',
'diff_pressure_kPa',
'diff_pressure_psi',
'water_temperature_Fahrenheit',
'water_level_ft',
'barometric_pressure_kPa',
'barometric_pressure_psi',
'level_corrected_ft',
'level_corrected_m',
'level_corrected_cm',
'discharge_cfs',
'discharge_cms',
'qc_status']
# Reorder columns
df2 = df2[desired_column_order]
# resample the data to correct 15min intervals
df2['timestamp_UTC'] = pd.to_datetime(df2['timestamp_UTC'])
# Check if any minute values are not 00, 15, 30, or 45
if any(df2['timestamp_UTC'].dt.minute % 15 != 0):
# Do something
print("There are timestamps with irregular minutes. Resampling.")
# Set the timestamp column as the index
df2.set_index('timestamp_UTC', inplace=True)
# Step 1: Convert -9999.99 to NaN
df2.replace(-9999.99, np.nan, inplace=True)
# Define an aggregation dictionary that specifies how to aggregate each column
# For numeric columns, use 'mean'; exclude or use a different function for non-numeric columns
aggregations = {col: 'mean' for col in df2.columns if col != 'qc_status'}
# You can add the 'qc_status' column back later or handle it separately as needed
# Resample using the defined aggregations
df2_resampled = df2.resample('15T').agg(aggregations)
# Round the values to two decimal places
df2_resampled = df2_resampled.round(2)
# Fill NaN values with -9999.99 for numeric columns only
for col in df2_resampled.select_dtypes(include=['number']).columns:
if col != 'qc_status': # Skip 'qc_status' column
df2_resampled[col] = df2_resampled[col].fillna(-9999.99)
#add qc column back in
df2_resampled["qc_status"] = 'Provisional'
# Convert timestamp back to string format
df2_resampled.index = df2_resampled.index.strftime('%Y-%m-%d %H:%M:%SZ')
df2_resampled = df2_resampled.rename_axis('timestamp_UTC').reset_index()
df2 = df2_resampled
else:
print("All timestamps have regular minutes (00, 15, 30, or 45).")
# Convert timestamp back to string format
df2['timestamp_UTC'] = df2['timestamp_UTC'].dt.strftime('%Y-%m-%d %H:%M:%SZ')
# Fill NaN values with -9999.99 for numeric columns only
for col in df2.select_dtypes(include=['number']).columns:
if col != 'qc_status': # Skip 'qc_status' column
df2[col] = df2[col].fillna(-9999.99)
#store data to master table for each site
master_path = Path(base_path if base_path else f'./') / site_name
master_path.mkdir(parents=True, exist_ok=True)
master_table = f"{site_name}_MasterTable.csv"
master_table_path = master_path / master_table
mode = 'a' if master_table_path.exists() else 'w'
header = not master_table_path.exists()
# record master table csv
df2.to_csv(master_table_path, mode=mode, index=False, header=header, escapechar='\\', quoting=csv.QUOTE_NONNUMERIC)
os.chmod(master_table_path, 0o775)
df2['timestamp_UTC'] = pd.to_datetime(df2['timestamp_UTC'], format='%Y-%m-%d %H:%M:%S%z')
# Group data by date
grouped = df2.groupby([df2['timestamp_UTC'].dt.year,
df2['timestamp_UTC'].dt.month.apply(lambda x: f'{x:02d}'),
df2['timestamp_UTC'].dt.day.apply(lambda x: f'{x:02d}'),
df2['timestamp_UTC'].dt.hour.apply(lambda x: f'{x:02d}00')])
#filename_list = [] # Initialize the list to record filenames
# store data
if append_to_single_file:
# Define the path for the long-running file
directory_path = Path(base_path if base_path else './')
directory_path.mkdir(parents=True, exist_ok=True) # Ensure directory exists
filename = f"{site_name}_MasterTable.csv"
filepath = directory_path / filename
df2['timestamp_UTC'] = df2['timestamp_UTC'].dt.strftime('%Y-%m-%d %H:%M:%S') + 'Z'
mode = 'a' if filepath.exists() else 'w'
header = not filepath.exists()
# Append data to the single file
df2.to_csv(filepath, mode=mode, index=False, header=header, escapechar='\\', quoting=csv.QUOTE_NONNUMERIC)
os.chmod(filepath, 0o775)
#filename_list.append(filepath.name) # Record the filename
else:
for (year, month, day, hour), group in grouped:
# daily csv files
directory_path = Path(base_path if base_path else f'./') / site_name / 'RawDaily' / str(year) / str(month)
directory_path.mkdir(parents=True, exist_ok=True)
filename = f"{site_name}_{year}{month}{day}.csv"
filepath = directory_path / filename
mode_daily = 'a' if filepath.exists() else 'w'
header_daily = not filepath.exists()
# reset 'timestamp_UTC' desired format
group['timestamp_UTC'] = group['timestamp_UTC'].dt.strftime('%Y-%m-%d %H:%M:%S') + 'Z'
# record daily csv file
group.to_csv(filepath, mode=mode_daily, index=False, header=header_daily, escapechar='\\', quoting=csv.QUOTE_NONNUMERIC)
os.chmod(filepath, 0o775)
#filename_list.append(filepath.name) # Record the filename
#------------------------------------------------------------------------------
# convert files to shef - must be done outside since the other files are in UTC
# Ensure timestamps include a timezone offset instead of just 'UTC'
# If not, you'll need to preprocess them to include a proper offset (e.g., replace 'UTC' with '+0000')
df2['timestamp_UTC'] = pd.to_datetime(df2['timestamp_UTC'], format='%Y-%m-%d %H:%M:%S%z')
# Convert to Pacific Time
pacific = pytz.timezone('US/Pacific')
df2['timestamp_UTC'] = df2['timestamp_UTC'].dt.tz_convert(pacific)
# Group data by date
grouped = df2.groupby([df2['timestamp_UTC'].dt.year,
df2['timestamp_UTC'].dt.month.apply(lambda x: f'{x:02d}'),
df2['timestamp_UTC'].dt.day.apply(lambda x: f'{x:02d}'),
df2['timestamp_UTC'].dt.hour.apply(lambda x: f'{x:02d}')])
for (year, month, day, hour), group in grouped:
#SHEF Hourly Output
shef_path = Path(base_path if base_path else f'./') / site_name / 'SHEF_Output' / str(year) / str(month) / str(day)
shef_path.mkdir(parents=True, exist_ok=True)
# Define the filename for SHEF files
shef_file = f"{cdec}_Streamflow_SHEF_{year}{month}{day}{hour}.txt"
# Full path for the file to be saved
shef_file_path = shef_path / shef_file
# Determine the file mode: 'a' to append if the file exists, 'w' to write otherwise
file_mode = 'a' if shef_file_path.exists() else 'w'
# Open the file to write SHEF data
with shef_file_path.open(mode=file_mode) as file:
for _, row in group.iterrows():
# Format the timestamp in SHEF format
timestamp_shef = row['timestamp_UTC'].strftime('%Y%m%d%H%M')
# Write stage and discharge lines in SHEF format
"""
The .A format is designed for the transmission of one or more hydrometeorological parameters observed at various times for a single station.
.A is the format used
P indicates Pacific time
DH = hour of day and also include minute value e.g. for 21:15 will be written as 2115
HGI = river stage (feet)
QRI = discharge (cubic feet per second)
"""
# If the rating_cuve_exists, include the discharge in the shef code. If not, only include stage data.
if rating_curve_exists:
data_line = f".A {cdec} {timestamp_shef[:8]} P DH{timestamp_shef[8:]} /HGI {format_shef_value(row['water_level_ft'])}/QRI {format_shef_value(row['discharge_cfs'])}"
else:
data_line = f".A {cdec} {timestamp_shef[:8]} P DH{timestamp_shef[8:]} /HGI {format_shef_value(row['water_level_ft'])}"
# Write to the file
file.write(data_line + '\n')
os.chmod(shef_file_path, 0o775)
#SHEF output - append all new data to one file
shef_path = Path(base_path if base_path else f'./') / site_name / 'SHEF_Output'
shef_path.mkdir(parents=True, exist_ok=True)
# Define the filename for SHEF files
shef_file = f"{cdec}_Streamflow_SHEF_latest.txt"
# Full path for the file to be saved
shef_file_path = shef_path / shef_file
# Determine the file mode: 'a' to append if the file exists, 'w' to write otherwise
file_mode = 'a' if shef_file_path.exists() else 'w'
# check if there is any timestamp at the start of the new hour (00 minute)
#overwrite = df2['timestamp_UTC'].dt.minute.isin([0]).any()
#file_mode = 'w' if overwrite else 'a'
# Open the file to write SHEF data
with shef_file_path.open(mode=file_mode) as file:
for _, row in df2.iterrows():
# Format the timestamp in SHEF format
timestamp_shef = row['timestamp_UTC'].strftime('%Y%m%d%H%M')
# Write stage and discharge lines in SHEF format
"""
The .A format is designed for the transmission of one or more hydrometeorological parameters observed at various times for a single station.
.A is the format used
P indicates Pacific time
DH = hour of day and also include minute value e.g. for 21:15 will be written as 2115
HGI = river stage (feet)
QRI = discharge (cubic feet per second)
"""
# If the rating_cuve_exists, include the discharge in the shef code. If not, only include stage data.
if rating_curve_exists:
data_line = f".A {cdec} {timestamp_shef[:8]} P DH{timestamp_shef[8:]} /HGI {format_shef_value(row['water_level_ft'])}/QRI {format_shef_value(row['discharge_cfs'])}"
else:
data_line = f".A {cdec} {timestamp_shef[:8]} P DH{timestamp_shef[8:]} /HGI {format_shef_value(row['water_level_ft'])}"
# Write to the file
file.write(data_line + '\n')
os.chmod(shef_file_path, 0o775)
# Return both the number of records processed and the list of filenames
return df2.shape[0] #, filename_list
def backfill_stream(hobolink_data,site_name, base_path=None, append_to_single_file=False):
# pass JSON data into a dateframe
df = pd.DataFrame.from_dict(hobolink_data["observation_list"])
# Parse data for stream gage
# Water Pressure
water_pressure = df.loc[df['sensor_measurement_type'] == 'Water Pressure']
# Check if the resulting DataFrame is empty
if water_pressure.empty:
# If no water pressure data is found, set default values
water_pressure_si = np.nan
water_pressure_us = np.nan
timestamp = np.nan
else:
water_pressure_si = water_pressure['si_value'].round(2).reset_index(drop=True)
water_pressure_us = water_pressure['us_value'].round(2).reset_index(drop=True)
# Date timestamps for data - each 'sensor_measurement_type' provides a 'timestamp' but only uses one value per timestamp so we drop duplicates
timestamp = water_pressure['timestamp'].reset_index(drop=True)
# Difference Pressure
diff_pressure = df.loc[df['sensor_measurement_type'] == 'Diff Pressure']
# Check if the resulting DataFrame is empty
if diff_pressure.empty:
# If no water pressure data is found, set default values
diff_pressure_si = np.nan
diff_pressure_us = np.nan
else:
diff_pressure_si = diff_pressure['si_value'].round(2).reset_index(drop=True)
diff_pressure_us = diff_pressure['us_value'].round(2).reset_index(drop=True)
# Water Temperature
water_temp = df.loc[df['sensor_measurement_type'] == 'Water Temperature']
# Check if the resulting DataFrame is empty
if water_temp.empty:
# If no water pressure data is found, set default values
water_temp_si = np.nan
water_temp_us = np.nan
else:
water_temp_si = water_temp['si_value'].round(2).reset_index(drop=True)
water_temp_us = water_temp['us_value'].round(2).reset_index(drop=True)
# Water Level
water_lvl = df.loc[df['sensor_measurement_type'] == 'Water Level']
# Check if the resulting DataFrame is empty
if water_lvl.empty:
# If no water pressure data is found, set default values
water_lvl_si = np.nan
water_lvl_us = np.nan
else:
water_lvl_si = water_lvl['si_value'].round(2).reset_index(drop=True)
water_lvl_us = water_lvl['us_value'].round(2).reset_index(drop=True)
# Barometric Pressure
bar_pressure = df.loc[df['sensor_measurement_type'] == 'Barometric Pressure']
# Check if the resulting DataFrame is empty
if bar_pressure.empty:
# If no barometric pressure data is found, set default values
bar_pressure_si = np.nan
bar_pressure_us = np.nan
else:
bar_pressure_si = bar_pressure['si_value'].round(2).reset_index(drop=True)
bar_pressure_us = bar_pressure['us_value'].round(2).reset_index(drop=True)
# Attempt to filter for Water Flow measurements
water_flow = df.loc[df['sensor_measurement_type'] == 'Water Flow']
# Check if the resulting DataFrame is empty
if water_flow.empty:
# If no Water Flow data is found, set default values
water_flow_cfs = -9999.99
water_flow_cms = -9999.99
else:
# Proceed with the normal logic for non-empty DataFrame
water_flow_si = water_flow['si_value'].round(2).reset_index(drop=True)
water_flow_cfs = water_flow['us_value'].round(2).reset_index(drop=True)
# Convert L/s to m³/s and round to the second decimal point
water_flow_cms = (water_flow_si * 0.001).round(2)
# Create a new dataframe with the parsed data
df2 = pd.DataFrame({'timestamp_UTC': timestamp,
'water_temperature_Celsius': water_temp_si,
'water_level_m': water_lvl_si,
'water_pressure_kPa': water_pressure_si,
'water_pressure_psi': water_pressure_us,
'diff_pressure_kPa': diff_pressure_si,
'diff_pressure_psi': diff_pressure_us,
'water_temperature_Fahrenheit': water_temp_us,
'water_level_ft': water_lvl_us,
'barometric_pressure_kPa': bar_pressure_si,
'barometric_pressure_psi': bar_pressure_us,
'discharge_cfs': water_flow_cfs,
'discharge_cms': water_flow_cms
})
# Define new columns - used for QC process
new_columns = {
'level_corrected_ft': [-9999.99] * len(df2),
'level_corrected_m': [-9999.99] * len(df2),
'level_corrected_cm': [-9999.99] * len(df2),
'qc_status': ["Provisional"] * len(df2)
}
# Convert new columns to DataFrame
new_columns_df = pd.DataFrame(new_columns)
# Concatenate existing DataFrame with new columns
df2 = pd.concat([df2, new_columns_df], axis=1)
# Define the desired order of columns
desired_column_order = ['timestamp_UTC',
'water_temperature_Celsius',
'water_level_m',
'water_pressure_kPa',
'water_pressure_psi',
'diff_pressure_kPa',
'diff_pressure_psi',
'water_temperature_Fahrenheit',
'water_level_ft',
'barometric_pressure_kPa',
'barometric_pressure_psi',
'level_corrected_ft',
'level_corrected_m',
'level_corrected_cm',
'discharge_cfs',
'discharge_cms',
'qc_status']
# Reorder columns
df2 = df2[desired_column_order]
#read csv where data will be backfilled
csv_df = pd.read_csv(site_name + ".csv")
# Iterate over the rows in the CSV DataFrame
for index, csv_row in csv_df.iterrows():
# Find the index of the row in the original DataFrame with the same timestamp
idx = df2[df2['timestamp_UTC'] == csv_row.iloc[0]].index
if not idx.empty:
# Replace the row in the CSV DataFrame with the row from the original DataFrame
csv_df.loc[index] = df2.loc[idx].iloc[0] # Update the row in the CSV DataFrame with the corresponding row from df2
# Write back the DataFrame to the CSV file, specifying header=True to keep the header
csv_df.to_csv(site_name + ".csv", index=False, header=True, escapechar='\\', quoting=csv.QUOTE_NONNUMERIC)
#os.chmod(filepath, 0o775)
return df2.shape[0]
# function to parse the data from the HOBOlink API
def parse_precip(hobolink_data, site_name, base_path=None, append_to_single_file=False):
# Parse data for PrecipMet Tipping Bucket
df = pd.DataFrame.from_dict(hobolink_data["observation_list"])
# Parse data for PrecipMet Tipping Bucket
precipitation_pulses = df.loc[df['sensor_measurement_type'] == 'Precipitation']
if precipitation_pulses.empty:
# If no water pressure data is found, set default values
precipitation_mm = np.nan
timestamp = np.nan
else:
precipitation_mm = precipitation_pulses['scaled_value'].round(2).reset_index(drop=True)
# Date timestamps for data - each 'sensor_measurement_type' provides a 'timestamp' but only uses one value per timestamp so we drop duplicates
timestamp = precipitation_pulses['timestamp'].iloc[:].reset_index(drop=True)
# Create a new dataframe with the parsed data
df2 = pd.DataFrame({'timestamp_UTC': timestamp,
'precipitation_mm': precipitation_mm
})
df2['timestamp_UTC'] = pd.to_datetime(df2['timestamp_UTC'], format='%Y-%m-%d %H:%M:%S%z')
# Calculate accumulated precipitation
df2['accumulated_precipitation_mm'] = df2['precipitation_mm'].cumsum()
# Read the site.csv file to get the last recorded accumulated value
site_csv_path = Path(base_path if base_path else f'./{site_name}.csv')
filename = f"{site_name}.csv"
site_csv_path = site_csv_path / filename
if site_csv_path.exists():
with open(site_csv_path, 'r') as file:
reader = csv.DictReader(file)
last_record = None
for row in reader:
last_record = row
if last_record is not None:
last_accumulated_precip = float(last_record['accumulated_precipitation_mm'])
df2['accumulated_precipitation_mm'] += last_accumulated_precip
# Identify the index where Timestamp reaches October 1st, 00:00:00
october_indices = df2[df2['timestamp_UTC'].dt.month == 10].index
if not october_indices.empty:
october_index = october_indices[0]
else:
# Handle the case where there are no timestamps for October
# For example, set october_index to None or handle the situation in an appropriate way
october_index = None
if october_index is not None:
# Reset accumulated precipitation to 0 for each water year
df2['Water_Year'] = (df2['timestamp_UTC'].dt.year + (df2['timestamp_UTC'].dt.month >= 10)).astype(str)
df2.loc[october_index:, 'accumulated_precipitation_mm'] -= df2.loc[october_index, 'accumulated_precipitation_mm']
precip_at_reset = df2.loc[october_index, 'precipitation_mm']
df2.loc[october_index, 'accumulated_precipitation_mm'] = 0
# Add precipitation recorded at the reset moment to the new water year's accumulated total
if precip_at_reset > 0:
df2.loc[october_index + 1, 'accumulated_precipitation_mm'] += precip_at_reset
df2['accumulated_precipitation_mm'] = df2['accumulated_precipitation_mm'].round(2)
# Group data by date
grouped = df2.groupby([df2['timestamp_UTC'].dt.year,
df2['timestamp_UTC'].dt.month.apply(lambda x: f'{x:02d}'),
df2['timestamp_UTC'].dt.day.apply(lambda x: f'{x:02d}'),
df2['timestamp_UTC'].dt.hour.apply(lambda x: f'{x:02d}00')])
filename_list = [] # Initialize the list to record filenames
if append_to_single_file:
# Define the path for the long-running file
directory_path = Path(base_path if base_path else './')
directory_path.mkdir(parents=True, exist_ok=True) # Ensure directory exists
filename = f"{site_name}.csv"
filepath = directory_path / filename
df2['timestamp_UTC'] = df2['timestamp_UTC'].dt.strftime('%Y-%m-%d %H:%M:%S') + 'Z'
mode = 'a' if filepath.exists() else 'w'
header = not filepath.exists()
# Append data to the single file
df2.to_csv(filepath, mode=mode, index=False, header=header, escapechar='\\', quoting=csv.QUOTE_NONNUMERIC)
# Set the file permissions to 0775
os.chmod(filepath, 0o775)
filename_list.append(filepath.name) # Record the filename
else:
for (year, month, day, hour), group in grouped:
directory_path = Path(base_path / site_name / 'RawDaily' if base_path else f'./{site_name}/RawDaily') / str(year) / str(month)
directory_path.mkdir(parents=True, exist_ok=True)
filename = f"{site_name}_{year}-{month}-{day}.csv"
filepath = directory_path / filename
group['timestamp_UTC'] = group['timestamp_UTC'].dt.strftime('%Y-%m-%d %H:%M:%S') + 'Z'
mode = 'a' if filepath.exists() else 'w'
header = not filepath.exists()
group.to_csv(filepath, mode=mode, index=False, header=header, escapechar='\\', quoting=csv.QUOTE_NONNUMERIC)
os.chmod(filepath, 0o775)
filename_list.append(filepath.name) # Record the filename
# Return both the number of records processed and the list of filenames
return df2.shape[0], filename_list
def backfill_precip(hobolink_data,filename):
# Parse data for PrecipMet Tipping Bucket
df = pd.DataFrame.from_dict(hobolink_data["observation_list"])
# Parse data for PrecipMet Tipping Bucket
precipitation_pulses = df.loc[df['sensor_measurement_type'] == 'Precipitation']
if precipitation_pulses.empty:
# If no water pressure data is found, set default values
precipitation_mm = np.nan
timestamp = np.nan
else:
precipitation_mm = precipitation_pulses['si_value'].round(2).reset_index(drop=True)
# Date timestamps for data - each 'sensor_measurement_type' provides a 'timestamp' but only uses one value per timestamp so we drop duplicates
timestamp = precipitation_pulses['timestamp'].iloc[:].reset_index(drop=True)
# Create a new dataframe with the parsed data
df2 = pd.DataFrame({'timestamp_UTC': timestamp,
'precipitation_mm': precipitation_mm
})
# Calculate accumulated total
df2['accumulated_precipitation_mm'] = df2['precipitation_mm'].cumsum()
# Read the site.csv file to get the last recorded accumulated value
site_csv_path = Path(filename)
if site_csv_path.exists():
with open(site_csv_path, 'r') as file:
reader = csv.DictReader(file)
last_record = None
for row in reader:
last_record = row
if last_record is not None:
last_accumulated_precip = float(last_record['accumulated_precipitation_mm'])
df2['accumulated_precipitation_mm'] += last_accumulated_precip
# Identify the index where Timestamp reaches October 1st, 00:00:00
october_indices = df2[df2['timestamp_UTC'].dt.month == 10].index
if not october_indices.empty:
october_index = october_indices[0]
else:
# Handle the case where there are no timestamps for October
# For example, set october_index to None or handle the situation in an appropriate way
october_index = None
if october_index is not None:
# Reset accumulated precipitation to 0 for each water year
df2['Water_Year'] = (df2['timestamp_UTC'].dt.year + (df2['timestamp_UTC'].dt.month >= 10)).astype(str)
df2.loc[october_index:, 'accumulated_precipitation_mm'] -= df2.loc[october_index, 'accumulated_precipitation_mm']
precip_at_reset = df2.loc[october_index, 'precipitation_mm']
df2.loc[october_index, 'accumulated_precipitation_mm'] = 0
# Add precipitation recorded at the reset moment to the new water year's accumulated total
if precip_at_reset > 0:
df2.loc[october_index + 1, 'accumulated_precipitation_mm'] += precip_at_reset
csv_df = pd.read_csv(filename)
# Iterate over the rows in the CSV DataFrame
for index, csv_row in csv_df.iterrows():
# Find the index of the row in the original DataFrame with the same timestamp
idx = df2[df2['timestamp_UTC'] == csv_row.iloc[0]].index
if not idx.empty:
# Replace the row in the CSV DataFrame with the row from the original DataFrame
csv_df.loc[index] = df2.loc[idx].iloc[0] # Update the row in the CSV DataFrame with the corresponding row from df2
# Write back the DataFrame to the CSV file, specifying header=True to keep the header
csv_df.to_csv(filename, index=False, header=True, escapechar='\\', quoting=csv.QUOTE_NONNUMERIC)
#os.chmod(filepath, 0o775)
return df2.shape[0]
# split datetime into intervals to pull smaller data chunks for larger pulls of data
# work around to not run into the data limit issues with the HOBOlink_API
# currently doing
def timestamp_chunks(start_date, end_date, overlap_delta):
intervals = []
current_date = start_date
while current_date < end_date:
interval_end = min(current_date + timedelta(weeks=2), end_date)
intervals.append((current_date, interval_end - overlap_delta))
current_date = interval_end
# Replace the end of the last interval with the provided end_date
if intervals:
intervals[-1] = (intervals[-1][0], end_date)
return intervals
def find_nan(csv_file):
# Read the CSV file
df = pd.read_csv(csv_file)
# Initialize variables to store timestamp ranges just before and after NaN ranges
nan_ranges = []
current_start = None
#nan_found = False
# Iterate over each row in the DataFrame
for index, row in df.iterrows():
# Check if any NaN values exist in the row
if row.isnull().any() or (row == "").any():
# If not already recording, record the timestamp just before the NaN range begins
if current_start is None:
if index == df.index[0]: # If it's the first row, there's no previous row
current_start = None
else:
current_start = df.loc[df.index.get_loc(index) - 1, 'timestamp_UTC']
else:
# If recording, record the timestamp just after the NaN range ends
if current_start is not None:
nan_ranges.append((current_start, row['timestamp_UTC']))
current_start = None
#nan_found = True
# If recording when reaching the end of the DataFrame
if current_start is not None:
nan_ranges.append((current_start, df.iloc[-1]['timestamp_UTC']))
if not nan_ranges:
return "no"
else:
return nan_ranges
def find_nan_optimized(csv_file):
# Read the CSV file
df = pd.read_csv(csv_file)
# Ensure the 'timestamp_UTC' column exists
if 'timestamp_UTC' not in df.columns:
return "Error: 'timestamp_UTC' column not found in the CSV file."
# Replace empty strings with np.nan for uniformity
df.replace("", np.nan, inplace=True)
# Identify rows with any NaN values
has_nan = df.isnull().any(axis=1)
# Find start and end indices of NaN ranges
starts = np.where(has_nan & ~has_nan.shift(1, fill_value=False))[0]
ends = np.where(has_nan & ~has_nan.shift(-1, fill_value=False))[0]
# Initialize variable to store timestamp ranges
nan_ranges = []
# Iterate over the start and end indices to form timestamp ranges
for start, end in zip(starts, ends):
start_timestamp = df.iloc[start - 1]['timestamp_UTC'] if start > 0 else None
end_timestamp = df.iloc[end]['timestamp_UTC']
nan_ranges.append((start_timestamp, end_timestamp))
return nan_ranges if nan_ranges else "no"
# Custom aggregation function that handles -9999 values correctly
def custom_mean(series):
filtered_series = series[series != -9999] # Exclude -9999 values
if len(filtered_series) == 0: # If all values were -9999
return -9999.99
else:
return filtered_series.mean()
# Function to calculate discharge using linear interpolation
def calculate_discharge(stage, rating_curve):
# Check if the rating curve DataFrame is empty
if rating_curve.empty:
return -9999.99 # Set discharge to -9999 if no rating curve
# If the stage is outside the range of the rating curve, handle accordingly
if stage < rating_curve['Level.ft'].min() or stage > rating_curve['Level.ft'].max():
return -9999.99 # Also set to -9999 or any other value indicating out-of-range
# Perform the interpolation
interpolated_value = np.interp(stage, rating_curve['Level.ft'], rating_curve['discharge.cfs'])
# Check if the interpolated value is infinite and replace it with -9999.99 if true
if np.isinf(interpolated_value):
return -9999.99
else:
return interpolated_value
# Adjusted function to handle formatting and special case
def format_shef_value(value):
# First, handle the special case of -9999.99
if value == -9999.99:
return "-9999"
# Then, format other values as floating-point numbers with appropriate precision
else:
return f"{value:.2f}"