-
Notifications
You must be signed in to change notification settings - Fork 10
/
SoapySource.cpp
139 lines (120 loc) · 4.78 KB
/
SoapySource.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
// Copyright (c) 2014-2017 Josh Blum
// SPDX-License-Identifier: BSL-1.0
#include "SoapyBlock.hpp"
#include <SoapySDR/Errors.hpp>
class SDRSource : public SoapyBlock
{
public:
static Block *make(const Pothos::DType &dtype, const std::vector<size_t> &channels)
{
return new SDRSource(dtype, channels);
}
SDRSource(const Pothos::DType &dtype, const std::vector<size_t> &channels):
SoapyBlock(SOAPY_SDR_RX, dtype, channels),
_postTime(false)
{
for (size_t i = 0; i < _channels.size(); i++) this->setupOutput(i, dtype);
}
/*******************************************************************
* Streaming implementation
******************************************************************/
void activate(void)
{
SoapyBlock::activate();
_postTime = true;
}
void work(void)
{
int flags = 0;
long long timeNs = 0;
const size_t numElems = this->workInfo().minOutElements;
if (numElems == 0) return;
const long timeoutUs = this->workInfo().maxTimeoutNs/1000;
const auto &buffs = this->workInfo().outputPointers;
//initial non-blocking read for all available samples that can fit into the buffer
int ret = _device->readStream(_stream, buffs.data(), numElems, flags, timeNs, 0);
//otherwise perform a blocking read on the single transfer unit size (in samples)
if (ret == SOAPY_SDR_TIMEOUT or ret == 0)
{
const auto minNumElems = std::min(numElems, _device->getStreamMTU(_stream));
ret = _device->readStream(_stream, buffs.data(), minNumElems, flags, timeNs, timeoutUs);
}
//handle error
if (ret <= 0)
{
//consider this to mean that the HW produced size 0 transfer
//the flags and time may be valid, but we are discarding here
if (ret == 0) return this->yield();
//got timeout? just call again
if (ret == SOAPY_SDR_TIMEOUT) return this->yield();
//got overflow? call again, discontinuity means repost time
if (ret == SOAPY_SDR_OVERFLOW) _postTime = true;
if (ret == SOAPY_SDR_OVERFLOW) return this->yield();
//otherwise throw an exception with the error code
throw Pothos::Exception("SDRSource::work()", "readStream "+std::string(SoapySDR::errToStr(ret)));
}
//handle packet mode when SOAPY_SDR_ONE_PACKET is specified
//produce a packet with matching labels and pop the buffer
if (_channels.size() <= 1 and (flags & SOAPY_SDR_ONE_PACKET) != 0)
{
auto outPort0 = this->output(0);
//set the packet payload
Pothos::Packet pkt;
pkt.payload = outPort0->buffer();
pkt.payload.setElements(ret);
//turn flags into metadata and labels
if ((flags & SOAPY_SDR_HAS_TIME) != 0)
{
pkt.metadata["rxTime"] = Pothos::Object(timeNs);
pkt.labels.emplace_back("rxTime", timeNs, 0);
}
if ((flags & SOAPY_SDR_END_BURST) != 0)
{
pkt.metadata["rxEnd"] = Pothos::Object(true);
pkt.labels.emplace_back("rxEnd", true, ret-1);
}
//consume buffer, produce message, done work()
outPort0->popElements(ret);
outPort0->postMessage(pkt);
return;
}
//produce output and post pending labels
for (auto output : this->outputs())
{
output->produce(size_t(ret));
//pending rx configuration labels
auto &pending = _pendingLabels.at(output->index());
if (pending.empty()) continue;
for (const auto &pair : pending)
{
output->postLabel(Pothos::Label(pair.first, pair.second, 0));
}
pending.clear();
}
//post labels from stream data
if (_postTime and (flags & SOAPY_SDR_HAS_TIME) != 0)
{
_postTime = false;
for (auto output : this->outputs())
{
output->postLabel("rxTime", timeNs, 0);
}
}
if ((flags & SOAPY_SDR_END_BURST) != 0)
{
_postTime = true; //discontinuity: repost time on next receive
for (auto output : this->outputs())
{
output->postLabel("rxEnd", true, ret-1);
}
}
//discontinuity signaled but ok packet? post time on next call
if ((flags & SOAPY_SDR_END_ABRUPT) != 0) _postTime = true;
}
private:
bool _postTime;
};
static Pothos::BlockRegistry registerSDRSource(
"/soapy/source", &SDRSource::make);
static Pothos::BlockRegistry registerSDRSourceAlias(
"/sdr/source", &SDRSource::make);