-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.py
30 lines (25 loc) · 1.23 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from openhaystack_python.AirTagCrypto import AirTagCrypto
import requests
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import ASYNCHRONOUS
from datetime import datetime
from config import *
from hashlib import md5
client = InfluxDBClient(url=influxdb_url, token=influxdb_token, org=influxdb_org)
write_api = client.write_api(write_options=ASYNCHRONOUS)
if __name__ == "__main__":
tags = {}
for key in private_keys:
tag = AirTagCrypto(key)
tags[tag.get_advertisement_key()] = tag
data = requests.post(simple_server_url, json={"ids": list(tags.keys())}).json()
for result in data['results']:
decrypt = tags[result['id']].decrypt_message(result['payload'])
date_time = datetime.fromtimestamp(decrypt['timestamp'])
write_api.write(influxdb_db, influxdb_org, Point(result['id'])
.tag('report_id', md5(result['payload'].encode()).hexdigest())
.field('latitude', decrypt['lat'])
.field("longitude", decrypt['lon'])
.field("tooltip", date_time.strftime("%d/%m/%Y %H:%M:%S"))
.time(decrypt['timestamp'] * 1000000000))
write_api.flush()