Skip to content

Commit

Permalink
add oqpsk modulation in backend
Browse files Browse the repository at this point in the history
  • Loading branch information
jopohl committed Jun 20, 2019
1 parent 26abbb8 commit 7e0a929
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 5 deletions.
32 changes: 28 additions & 4 deletions src/urh/cythonext/signal_functions.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -50,23 +50,34 @@ cpdef modulate_c(uint8_t[:] bits, uint32_t samples_per_symbol, str modulation_ty
float carrier_amplitude, float carrier_frequency, float carrier_phase, float sample_rate,
uint32_t pause, uint32_t start, iq iq_type,
float gauss_bt=0.5, float filter_width=1.0):
cdef int64_t i = 0, j = 0, index = 0, s_i = 0
cdef uint32_t total_symbols = int(len(bits) // bits_per_symbol)
cdef int64_t i = 0, j = 0, index = 0, s_i = 0, num_bits = len(bits)
cdef uint32_t total_symbols = int(num_bits // bits_per_symbol)
cdef int64_t total_samples = total_symbols * samples_per_symbol + pause

cdef float a = carrier_amplitude, f = carrier_frequency, phi = carrier_phase

cdef float t = 0, arg = 0

result = np.zeros((total_samples, 2), dtype=get_numpy_dtype(iq_type))
if num_bits == 0:
return result

cdef iq[:, ::1] result_view = result

cdef bool is_fsk = modulation_type.lower() == "fsk"
cdef bool is_ask = modulation_type.lower() == "ask"
cdef bool is_psk = modulation_type.lower() == "psk"
cdef bool is_oqpsk = modulation_type.lower() == "oqpsk"
cdef bool is_gfsk = modulation_type.lower() == "gfsk"

assert is_fsk or is_ask or is_psk or is_gfsk
assert is_fsk or is_ask or is_psk or is_gfsk or is_oqpsk

cdef uint8_t[:] oqpsk_bits
if is_oqpsk:
assert bits_per_symbol == 2
bits = get_oqpsk_bits(bits)
samples_per_symbol = samples_per_symbol // 2
total_symbols *= 2

cdef np.ndarray[np.float32_t, ndim=2] gauss_filtered_freqs_phases
if is_gfsk:
Expand All @@ -76,6 +87,7 @@ cpdef modulate_c(uint8_t[:] bits, uint32_t samples_per_symbol, str modulation_ty

for s_i in prange(0, total_symbols, schedule="static", nogil=True):
index = bit_array_to_number(bits, end=(s_i+1)*bits_per_symbol, start=s_i*bits_per_symbol)

a = carrier_amplitude
f = carrier_frequency
phi = carrier_phase
Expand All @@ -86,7 +98,7 @@ cpdef modulate_c(uint8_t[:] bits, uint32_t samples_per_symbol, str modulation_ty
continue
elif is_fsk:
f = parameters[index]
elif is_psk:
elif is_psk or is_oqpsk:
phi = parameters[index]

for i in range(s_i * samples_per_symbol, (s_i+1)*samples_per_symbol):
Expand All @@ -101,6 +113,18 @@ cpdef modulate_c(uint8_t[:] bits, uint32_t samples_per_symbol, str modulation_ty

return result

cdef uint8_t[:] get_oqpsk_bits(uint8_t[:] original_bits):
cdef int64_t i, num_bits = len(original_bits)
result = np.empty(2*num_bits+2, dtype=np.uint8)

for i in range(0, num_bits-1, 2):
result[2*i] = original_bits[i]
result[2*i+2] = original_bits[i]
result[2*i+3] = original_bits[i+1]
result[2*i+5] = original_bits[i+1]

return result[2:len(result)-2]


cdef np.ndarray[np.float32_t, ndim=2] get_gauss_filtered_freqs_phases(uint8_t[:] bits, float[:] parameters,
uint32_t num_symbols, uint32_t samples_per_symbol,
Expand Down
9 changes: 8 additions & 1 deletion tests/test_modulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,19 @@ def test_c_modulation_method_fsk(self):
#result.tofile("/tmp/test_fsk.complex")

def test_c_modulation_method_psk(self):
bits = array.array("B", [0, 0, 0, 0, 1, 0, 1, 0, 1, 1, 0, 0, 0, 1])
bits = array.array("B", [0, 0, 0, 0, 1, 0, 1, 0, 1, 1, 0, 0, 1, 1, 0, 1])
parameters = array.array("f", [np.pi/4, 3*np.pi/4, 5*np.pi/4, 7*np.pi/4])
result = modulate_c(bits, 100, "PSK", parameters, 2, 1, 40e3, 0, 1e6, 1000, 0, parameters[0])

# result.tofile("/tmp/test_psk.complex")

def test_c_modulation_method_oqpsk(self):
bits = array.array("B", [0, 0, 0, 0, 1, 0, 1, 0, 1, 1, 0, 0, 1, 1, 0, 1])
parameters = array.array("f", [np.pi/4, 3*np.pi/4, 5*np.pi/4, 7*np.pi/4])
result = modulate_c(bits, 100, "OQPSK", parameters, 2, 1, 40e3, 0, 1e6, 1000, 0, parameters[0])

# result.tofile("/tmp/test_oqpsk.complex")

def test_c_modulation_method_gfsk(self):
bits = array.array("B", [1, 0, 1, 0, 1, 1, 0, 0, 0, 1])
parameters = array.array("f", [-10e3, 10e3])
Expand Down

0 comments on commit 7e0a929

Please sign in to comment.