-
Notifications
You must be signed in to change notification settings - Fork 187
/
Copy pathFactory.cpp
301 lines (256 loc) · 9.76 KB
/
Factory.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
// Copyright (c) 2014-2018 Josh Blum
// 2021 Nicholas Corgan
// SPDX-License-Identifier: BSL-1.0
#include <SoapySDR/Device.hpp>
#include <SoapySDR/Registry.hpp>
#include <SoapySDR/Modules.hpp>
#include <SoapySDR/Logger.hpp>
#include <algorithm>
#include <stdexcept>
#include <exception>
#include <future>
#include <iterator>
#include <chrono>
#include <mutex>
static std::recursive_mutex &getFactoryMutex(void)
{
static std::recursive_mutex mutex;
return mutex;
}
typedef std::map<SoapySDR::Kwargs, SoapySDR::Device *> DeviceTable;
static DeviceTable &getDeviceTable(void)
{
static DeviceTable table;
return table;
}
typedef std::map<SoapySDR::Device *, size_t> DeviceCounts;
static DeviceCounts &getDeviceCounts(void)
{
static DeviceCounts table;
return table;
}
void automaticLoadModules(void);
SoapySDR::KwargsList SoapySDR::Device::enumerate(const Kwargs &args)
{
automaticLoadModules(); //perform one-shot load
//enumerate cache data structure
//(driver key, find args) -> (timestamp, handles list)
//Since available devices should not change rapidly,
//the cache allows the enumerate results to persist for some time
//across multiple concurrent callers or subsequent sequential calls.
static std::recursive_mutex cacheMutex;
static std::map<std::pair<std::string, Kwargs>,
std::pair<std::chrono::high_resolution_clock::time_point, std::shared_future<KwargsList>>
> cache;
//clean expired entries from the cache
{
static const auto CACHE_TIMEOUT = std::chrono::seconds(1);
std::lock_guard<std::recursive_mutex> lock(cacheMutex);
const auto now = std::chrono::high_resolution_clock::now();
for (auto it = cache.begin(); it != cache.end();)
{
if (it->second.first+CACHE_TIMEOUT < now) cache.erase(it++);
else it++;
}
}
//launch futures to enumerate devices for each module
std::map<std::string, std::shared_future<KwargsList>> futures;
for (const auto &it : Registry::listFindFunctions())
{
const bool specifiedDriver = args.count("driver") != 0;
if (specifiedDriver and args.at("driver") != it.first) continue;
//protect the cache to search it for results and update it
std::lock_guard<std::recursive_mutex> lock(cacheMutex);
auto &cacheEntry = cache[std::make_pair(it.first, args)];
//use the cache entry if its been initialized (valid) and not expired
if (cacheEntry.second.valid() and cacheEntry.first > std::chrono::high_resolution_clock::now())
{
futures[it.first] = cacheEntry.second;
}
//otherwise create a new future and place it into the cache
else
{
const auto launchType = specifiedDriver?std::launch::deferred:std::launch::async;
futures[it.first] = std::async(launchType, it.second, args);
cacheEntry = std::make_pair(std::chrono::high_resolution_clock::now(), futures[it.first]);
}
}
//collect the asynchronous results
SoapySDR::KwargsList results;
for (auto &it : futures)
{
try
{
for (auto handle : it.second.get())
{
handle["driver"] = it.first;
results.push_back(handle);
}
}
catch (const std::exception &ex)
{
SoapySDR::logf(SOAPY_SDR_ERROR, "SoapySDR::Device::enumerate(%s) %s", it.first.c_str(), ex.what());
}
catch (...)
{
SoapySDR::logf(SOAPY_SDR_ERROR, "SoapySDR::Device::enumerate(%s) unknown error", it.first.c_str());
}
}
return results;
}
SoapySDR::KwargsList SoapySDR::Device::enumerate(const std::string &args)
{
return enumerate(KwargsFromString(args));
}
static SoapySDR::Device* getDeviceFromTable(const SoapySDR::Kwargs &args)
{
if (args.empty()) return nullptr;
const auto it = getDeviceTable().find(args);
if (it == getDeviceTable().end()) return nullptr;
const auto device = it->second;
if (device == nullptr) throw std::runtime_error("SoapySDR::Device::make() device deletion in-progress");
getDeviceCounts()[device]++;
return device;
}
SoapySDR::Device* SoapySDR::Device::make(const Kwargs &inputArgs)
{
std::unique_lock<std::recursive_mutex> lock(getFactoryMutex());
//the arguments may have already come from enumerate and been used to open a device
auto device = getDeviceFromTable(inputArgs);
if (device != nullptr) return device;
//otherwise the args must always come from an enumeration result
//unlock the mutex to block on the enumeration call
Kwargs discoveredArgs;
lock.unlock();
const auto results = Device::enumerate(inputArgs);
if (not results.empty()) discoveredArgs = results.front();
lock.lock();
//check the device table for an already allocated device
device = getDeviceFromTable(discoveredArgs);
if (device != nullptr) return device;
//load the enumeration args with missing keys from the make argument
Kwargs hybridArgs = discoveredArgs;
for (const auto &it : inputArgs)
{
if (hybridArgs.count(it.first) == 0) hybridArgs[it.first] = it.second;
}
//dont continue when driver is unspecified,
//unless there is only one available driver option
const bool specifiedDriver = hybridArgs.count("driver") != 0;
const auto makeFunctions = Registry::listMakeFunctions();
if (not specifiedDriver and makeFunctions.size() > 2) //more than factory: null + one loaded driver
{
throw std::runtime_error("SoapySDR::Device::make() no driver specified and no enumeration results");
}
//search for a cache entry or launch a future if not found
std::map<Kwargs, std::shared_future<Device *>> cache;
std::shared_future<Device *> deviceFuture;
for (const auto &it : makeFunctions)
{
if (not specifiedDriver and it.first == "null") continue; //skip null unless explicitly specified
if (specifiedDriver and hybridArgs.at("driver") != it.first) continue; //filter for driver match
auto &cacheEntry = cache[discoveredArgs];
if (not cacheEntry.valid()) cacheEntry = std::async(std::launch::deferred, it.second, hybridArgs);
deviceFuture = cacheEntry;
break;
}
//no match found for the arguments in the loop above
if (not deviceFuture.valid()) throw std::runtime_error("SoapySDR::Device::make() no match");
//unlock the mutex to block on the factory call
lock.unlock();
deviceFuture.wait();
lock.lock();
//the future is complete, erase the cache entry
//other callers have a copy of the shared future copy or a device table entry
cache.erase(discoveredArgs);
//store into the table
device = deviceFuture.get(); //may throw
getDeviceTable()[discoveredArgs] = device;
getDeviceCounts()[device]++;
return device;
}
SoapySDR::Device *SoapySDR::Device::make(const std::string &args)
{
return make(KwargsFromString(args));
}
void SoapySDR::Device::unmake(Device *device)
{
if (device == nullptr) return; //safe to unmake a null device
std::unique_lock<std::recursive_mutex> lock(getFactoryMutex());
auto countIt = getDeviceCounts().find(device);
if (countIt == getDeviceCounts().end())
{
throw std::runtime_error("SoapySDR::Device::unmake() unknown device");
}
if ((--countIt->second) != 0) return;
//cleanup case for last instance of open device
getDeviceCounts().erase(countIt);
//nullify matching entries in the device table
//make throws if it matches handles which are being deleted
KwargsList argsList;
for (auto &it : getDeviceTable())
{
if (it.second != device) continue;
argsList.push_back(it.first);
it.second = nullptr;
}
//do not block other callers while we wait on destructor
lock.unlock();
delete device;
lock.lock();
//now clean the device table to signal that deletion is complete
for (const auto &args : argsList) getDeviceTable().erase(args);
}
/*******************************************************************
* Parallel support
******************************************************************/
std::vector<SoapySDR::Device *> SoapySDR::Device::make(const KwargsList &argsList)
{
std::vector<std::future<Device *>> futures;
for (const auto &args : argsList)
{
futures.push_back(std::async(std::launch::async, [args]{return SoapySDR::Device::make(args);}));
}
std::vector<Device *> devices;
try
{
for (auto &future : futures) devices.push_back(future.get());
}
catch(...)
{
//cleanup all devices made so far, and squelch their errors
try{SoapySDR::Device::unmake(devices);}
catch(...){}
//and then rethrow the exception after cleanup
throw;
}
return devices;
}
std::vector<SoapySDR::Device *> SoapySDR::Device::make(const std::vector<std::string> &argsList)
{
SoapySDR::KwargsList kwargsList;
std::transform(
argsList.begin(),
argsList.end(),
std::back_inserter(kwargsList),
SoapySDR::KwargsFromString);
return make(kwargsList);
}
void SoapySDR::Device::unmake(const std::vector<Device *> &devices)
{
std::vector<std::future<void>> futures;
for (const auto &device : devices)
{
futures.push_back(std::async(std::launch::async, [device]{SoapySDR::Device::unmake(device);}));
}
//unmake will only throw the last exception
//Since unmake only throws for unknown handles, this is probably API misuse.
//The actual particular exception and its associated device is not important.
std::exception_ptr eptr;
for (auto &future : futures)
{
try {future.get();}
catch(...){eptr = std::current_exception();}
}
if (eptr) std::rethrow_exception(eptr);
}