Skip to content

Commit

Permalink
[meta-manager] tool to update sigmf annotations
Browse files Browse the repository at this point in the history
  • Loading branch information
Sec42 committed Nov 23, 2024
1 parent 2366836 commit 91b9e15
Showing 1 changed file with 122 additions and 0 deletions.
122 changes: 122 additions & 0 deletions meta-manager
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
#!/usr/bin/env python3

import os
import argparse
import json
import datetime


def Annotation(**kwargs):
t = {}
if 'width' in kwargs:
t['freq_upper_edge'] = round(kwargs['freq']+kwargs['width']/2)
t['freq_lower_edge'] = round(kwargs['freq']-kwargs['width']/2)
del kwargs['width']
del kwargs['freq']
if 'count' in kwargs:
t['sample_count'] = kwargs['count']
del kwargs['count']
if 'start' in kwargs:
t['sample_start'] = kwargs['start']
del kwargs['start']

for a in kwargs:
t[a] = kwargs[a]

return {"core:"+k: v for k, v in t.items()}


parser = argparse.ArgumentParser()

parser.add_argument("-v", "--verbose", action="store_true",
help="increase output verbosity")
parser.add_argument("-d", "--debug", action="store_true")
parser.add_argument("-w", "--write", help="write new file")
parser.add_argument("-p", "--peaks", help="add peaks from file")

filters = parser.add_argument_group('filters')
filters.add_argument("--f-min", metavar="freq", type=int, help="minmum frequency")
filters.add_argument("--f-max", metavar="freq", type=int, help="maximum frequency")
filters.add_argument("--t-max", metavar="seconds", type=int, help="maxiumum time")
filters.add_argument("--c-max", metavar="count", type=int, help="maxiumum number of annotations")

parser.add_argument("metafile", help="filename")

args = parser.parse_args()

with open(args.metafile) as f:
data = json.load(f)

assert(len(data["captures"]) == 1)
assert(data["captures"][0]["core:sample_start"] == 0)
assert(data["captures"])

rate = int(data["global"]["core:sample_rate"])
dt = data["captures"][0]["core:datetime"]
freq = int(data["captures"][0]["core:frequency"])

print(f'{args.metafile}: [{data["global"]["core:datatype"]}] {rate/1e6:.9g}Msps @{freq/1e9:.9g}GHz')

# Remove empty annotations
ann = [a for a in data["annotations"] if len(a) > 0]

print(f'in {len(ann)} annotations')

if args.peaks is not None:
with open(args.peaks) as f:
for line in f:
try:
sample, fftbin, _ = line.split(",")
sample = int(sample)
fftbin = int(fftbin)
except main:
pass

bins = 8192

freq = freq + rate * (fftbin / bins - 0.5)
width = rate / bins

a = Annotation(comment="Peak", description="", freq=freq, width=width, start=sample, count=bins)

ann.append(a)


if args.f_max is not None:
ann = [a for a in ann if a["core:freq_upper_edge"] > args.f_max]

if args.f_min is not None:
ann = [a for a in ann if a["core:freq_lower_edge"] < args.f_min]

if args.t_max is not None:
ann = [a for a in ann if a["core:sample_start"] < args.t_max*rate]

if args.c_max is not None:
ann = sorted(ann, key=lambda v: v["core:sample_start"])[:args.c_max]

data["annotations"] = ann

fmin = min([a["core:freq_lower_edge"] for a in data["annotations"]])
fmax = max([a["core:freq_upper_edge"] for a in data["annotations"]])
tmin = min([a["core:sample_start"] for a in data["annotations"]])
tmax = max([a["core:sample_start"] for a in data["annotations"]])

if False:
for a in data["annotations"]:
print(f'{a["core:sample_start"]}')
pass


def s2t(samples): # convert samples to time
return samples / rate


print(f'out {len(data["annotations"])} annotations')
print(f' {fmin/1e9:.6f} - {fmax/1e9:.6f} GHz / {s2t(tmin):.3f} - {s2t(tmax):.3f} sec')


if args.write:
os.rename(args.metafile, args.metafile+".bak")

with open(args.metafile, "w") as f:
json.dump(data, f, indent=0)

0 comments on commit 91b9e15

Please sign in to comment.