diff --git a/src/main/java/io/github/dsheirer/dsp/filter/channelizer/PolyphaseChannelManager.java b/src/main/java/io/github/dsheirer/dsp/filter/channelizer/PolyphaseChannelManager.java index 8ea8ee2a7..9af9a88bc 100644 --- a/src/main/java/io/github/dsheirer/dsp/filter/channelizer/PolyphaseChannelManager.java +++ b/src/main/java/io/github/dsheirer/dsp/filter/channelizer/PolyphaseChannelManager.java @@ -22,10 +22,6 @@ import io.github.dsheirer.buffer.INativeBufferProvider; import io.github.dsheirer.buffer.NativeBufferPoisonPill; import io.github.dsheirer.controller.channel.event.ChannelStopProcessingRequest; -import io.github.dsheirer.dsp.filter.FilterFactory; -import io.github.dsheirer.dsp.filter.channelizer.output.IPolyphaseChannelOutputProcessor; -import io.github.dsheirer.dsp.filter.channelizer.output.OneChannelOutputProcessor; -import io.github.dsheirer.dsp.filter.channelizer.output.TwoChannelOutputProcessor; import io.github.dsheirer.dsp.filter.design.FilterDesignException; import io.github.dsheirer.eventbus.MyEventBus; import io.github.dsheirer.sample.Broadcaster; @@ -44,10 +40,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.SortedSet; import java.util.TreeSet; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import org.apache.commons.math3.util.FastMath; import org.slf4j.Logger; @@ -75,12 +69,13 @@ public class PolyphaseChannelManager implements ISourceEventProcessor private static final double MINIMUM_CHANNEL_BANDWIDTH = 25000.0; private static final double CHANNEL_OVERSAMPLING = 2.0; private static final int POLYPHASE_CHANNELIZER_TAPS_PER_CHANNEL = 9; - private static final int POLYPHASE_SYNTHESIZER_TAPS_PER_CHANNEL = 9; + public static final int POLYPHASE_SYNTHESIZER_TAPS_PER_CHANNEL = 9; private Broadcaster mSourceEventBroadcaster = new Broadcaster<>(); private INativeBufferProvider mNativeBufferProvider; private List mChannelSources = new CopyOnWriteArrayList<>(); private ChannelCalculator mChannelCalculator; + private SynthesisFilterManager mFilterManager = new SynthesisFilterManager(); private ComplexPolyphaseChannelizerM2 mPolyphaseChannelizer; private ChannelSourceEventListener mChannelSourceEventListener = new ChannelSourceEventListener(); private NativeBufferReceiver mNativeBufferReceiver = new NativeBufferReceiver(); @@ -181,63 +176,22 @@ public TunerChannelSource getChannel(TunerChannel tunerChannel) if(mRunning) { - List polyphaseIndexes = mChannelCalculator.getChannelIndexes(tunerChannel); - - IPolyphaseChannelOutputProcessor outputProcessor = getOutputProcessor(polyphaseIndexes); - - if(outputProcessor != null) + try { - long centerFrequency = mChannelCalculator.getCenterFrequencyForIndexes(polyphaseIndexes); - - try - { - channelSource = new PolyphaseChannelSource(tunerChannel, outputProcessor, mChannelSourceEventListener, - mChannelCalculator.getChannelSampleRate(), centerFrequency); + channelSource = new PolyphaseChannelSource(tunerChannel, mChannelCalculator, mFilterManager, + mChannelSourceEventListener); - mChannelSources.add(channelSource); - } - catch(FilterDesignException fde) - { - mLog.debug("Couldn't design final output low pass filter for polyphase channel source"); - } + mChannelSources.add(channelSource); + } + catch(IllegalArgumentException iae) + { + mLog.debug("Couldn't design final output low pass filter for polyphase channel source"); } } return channelSource; } - /** - * Creates a processor to process the channelizer channel indexes into a composite output stream providing - * channelized complex sample buffers to a registered source listener. - * @param indexes to target by the output processor - * @return output processor compatible with the number of indexes to monitor - */ - private IPolyphaseChannelOutputProcessor getOutputProcessor(List indexes) - { - switch(indexes.size()) - { - case 1: - return new OneChannelOutputProcessor(mChannelCalculator.getChannelSampleRate(), indexes, - mChannelCalculator.getChannelCount()); - case 2: - try - { - float[] filter = getOutputProcessorFilter(2); - return new TwoChannelOutputProcessor(mChannelCalculator.getChannelSampleRate(), indexes, filter, - mChannelCalculator.getChannelCount()); - } - catch(FilterDesignException fde) - { - mLog.error("Error designing 2 channel synthesis filter for output processor"); - } - default: - //TODO: create output processor for greater than 2 input channels - mLog.error("Request to create an output processor for unexpected channel index size:" + indexes.size()); - mLog.info(mChannelCalculator.toString()); - return null; - } - } - /** * Starts/adds the channel source to receive channelized sample buffers, registering with the tuner to receive * sample buffers when this is the first channel. @@ -311,10 +265,6 @@ public void process(SourceEvent sourceEvent) throws SourceException switch(sourceEvent.getEvent()) { case NOTIFICATION_FREQUENCY_CHANGE: - //Update channel calculator immediately so that channels can be allocated - mChannelCalculator.setCenterFrequency(sourceEvent.getValue().longValue()); - - //Defer channelizer configuration changes to be handled on the buffer processor thread mNativeBufferReceiver.receive(sourceEvent); break; case NOTIFICATION_SAMPLE_RATE_CHANGE: @@ -381,80 +331,22 @@ private void checkChannelizerConfiguration() * Updates each of the output processors for any changes in the tuner's center frequency or sample rate, which * would cause the output processors to change the polyphase channelizer results channel(s) that the processor is * consuming - * - * @param sourceEvent (optional-can be null) to broadcast to each output processor following the update */ - private void updateOutputProcessors(SourceEvent sourceEvent) + private void updateOutputProcessors() { for(PolyphaseChannelSource channelSource: mChannelSources) { - updateOutputProcessor(channelSource); - - //Send the non-null source event to each channel source - if(sourceEvent != null) - { - try - { - channelSource.process(sourceEvent); - } - catch(SourceException se) - { - mLog.error("Error while notifying polyphase channel source of a source event", se); - } - } - } - } - - /** - * Updates the polyphase channel source's output processor due to a change in the center frequency or sample - * rate for the source providing sample buffers to the polyphase channelizer, or whenever the DDC channel's - * center tuned frequency changes. - * - * @param channelSource that requires an update to its output processor - */ - private void updateOutputProcessor(PolyphaseChannelSource channelSource) - { - try - { - //If a change in sample rate or center frequency makes this channel no longer viable, then the channel - //calculator will throw an IllegalArgException ... handled below - List indexes = mChannelCalculator.getChannelIndexes(channelSource.getTunerChannel()); - - long centerFrequency = mChannelCalculator.getCenterFrequencyForIndexes(indexes); - - //If the indexes size is the same then update the current processor, otherwise create a new one - IPolyphaseChannelOutputProcessor outputProcessor = channelSource.getPolyphaseChannelOutputProcessor(); - - if(outputProcessor != null && outputProcessor.getInputChannelCount() == indexes.size()) + try { - channelSource.getPolyphaseChannelOutputProcessor().setPolyphaseChannelIndices(indexes); - channelSource.setFrequency(centerFrequency); - - if(indexes.size() > 1) - { - try - { - float[] filter = getOutputProcessorFilter(indexes.size()); - channelSource.getPolyphaseChannelOutputProcessor().setSynthesisFilter(filter); - } - catch(FilterDesignException fde) - { - mLog.error("Error creating an updated synthesis filter for the channel output processor"); - } - } + channelSource.updateOutputProcessor(mChannelCalculator, mFilterManager); } - else + catch(IllegalArgumentException iae) { - channelSource.setPolyphaseChannelOutputProcessor(getOutputProcessor(indexes), centerFrequency); + mLog.error("Error updating polyphase channel source output processor following tuner frequency or " + + "sample rate change"); + stopChannelSource(channelSource); } } - catch(IllegalArgumentException iae) - { - mLog.error("Error updating polyphase channel source - can't determine output channel indexes for " + - "updated tuner center frequency and sample rate. Stopping channel source", iae); - - stopChannelSource(channelSource); - } } /** @@ -497,29 +389,6 @@ public void removeSourceEventListener(Listener listener) mSourceEventBroadcaster.removeListener(listener); } - /** - * Generates (or reuses) an output processor filter for the specified number of channels. Each - * filter is created only once and stored in a map for reuse. This map is cleared anytime that the - * input sample rate changes, so that the filters can be recreated with the new channel sample rate. - * @param channels count - * @return filter - * @throws FilterDesignException if the filter cannot be designed to specification (-6 dB band edge) - */ - private float[] getOutputProcessorFilter(int channels) throws FilterDesignException - { - float[] taps = mOutputProcessorFilters.get(channels); - - if(taps == null) - { - taps = FilterFactory.getSincM2Synthesizer(mChannelCalculator.getChannelSampleRate(), - mChannelCalculator.getChannelBandwidth(), channels, POLYPHASE_SYNTHESIZER_TAPS_PER_CHANNEL); - - mOutputProcessorFilters.put(channels, taps); - } - - return taps; - } - /** * Internal class for handling requests for start/stop sample stream from polyphase channel sources */ @@ -584,7 +453,7 @@ public void receive(SourceEvent sourceEvent) */ public class NativeBufferReceiver implements Listener { - private Queue mQueuedSourceEvents = new ConcurrentLinkedQueue<>(); + private boolean mOutputProcessorUpdateRequired = false; /** * Queues the source event for deferred execution on the buffer processing thread. @@ -592,7 +461,13 @@ public class NativeBufferReceiver implements Listener */ public void receive(SourceEvent event) { - mQueuedSourceEvents.offer(event); + long frequency = event.getValue().longValue(); + + if(mChannelCalculator.getCenterFrequency() != frequency) + { + mChannelCalculator.setCenterFrequency(frequency); + mOutputProcessorUpdateRequired = true; + } } @Override @@ -600,20 +475,10 @@ public void receive(INativeBuffer nativeBuffer) { try { - //Process any queued source events before processing the buffers - SourceEvent queuedSourceEvent = mQueuedSourceEvents.poll(); - - while(queuedSourceEvent != null) + if(mOutputProcessorUpdateRequired) { - switch(queuedSourceEvent.getEvent()) - { - case NOTIFICATION_FREQUENCY_CHANGE: - //Don't send the tuner's frequency change event down to the channels - it would cause chaos - updateOutputProcessors(null); - break; - } - - queuedSourceEvent = mQueuedSourceEvents.poll(); + updateOutputProcessors(); + mOutputProcessorUpdateRequired = false; } if(mPolyphaseChannelizer != null) diff --git a/src/main/java/io/github/dsheirer/dsp/filter/channelizer/PolyphaseChannelSource.java b/src/main/java/io/github/dsheirer/dsp/filter/channelizer/PolyphaseChannelSource.java index a8e56f8a0..fb90d83fd 100644 --- a/src/main/java/io/github/dsheirer/dsp/filter/channelizer/PolyphaseChannelSource.java +++ b/src/main/java/io/github/dsheirer/dsp/filter/channelizer/PolyphaseChannelSource.java @@ -19,6 +19,8 @@ package io.github.dsheirer.dsp.filter.channelizer; import io.github.dsheirer.dsp.filter.channelizer.output.IPolyphaseChannelOutputProcessor; +import io.github.dsheirer.dsp.filter.channelizer.output.OneChannelOutputProcessor; +import io.github.dsheirer.dsp.filter.channelizer.output.TwoChannelOutputProcessor; import io.github.dsheirer.dsp.filter.design.FilterDesignException; import io.github.dsheirer.sample.Listener; import io.github.dsheirer.sample.complex.ComplexSamples; @@ -26,8 +28,10 @@ import io.github.dsheirer.source.tuner.channel.StreamProcessorWithHeartbeat; import io.github.dsheirer.source.tuner.channel.TunerChannel; import io.github.dsheirer.source.tuner.channel.TunerChannelSource; - import java.util.List; +import java.util.concurrent.locks.ReentrantLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Polyphase Channelizer's Tuner Channel Source implementation. Wraps a ChannelOutputProcessor instance and @@ -36,34 +40,30 @@ */ public class PolyphaseChannelSource extends TunerChannelSource implements Listener { + private Logger mLog = LoggerFactory.getLogger(PolyphaseChannelSource.class); private IPolyphaseChannelOutputProcessor mPolyphaseChannelOutputProcessor; - private IPolyphaseChannelOutputProcessor mReplacementPolyphaseChannelOutputProcessor; private StreamProcessorWithHeartbeat mStreamHeartbeatProcessor; - private long mReplacementFrequency; private double mChannelSampleRate; private long mIndexCenterFrequency; private long mChannelFrequencyCorrection; + private ReentrantLock mOutputProcessorLock = new ReentrantLock(); /** * Constructs an instance * * @param tunerChannel describing the desired channel frequency and bandwidth/minimum sample rate - * @param outputProcessor - to process polyphase channelizer channel results into a channel stream + * @param channelCalculator for current channel center frequency and sample rate and index calculations + * @param filterManager for access to new or cached synthesis filters * @param producerSourceEventListener to receive source event requests (e.g. start/stop sample stream) - * @param channelSampleRate for the downstream sample output - * @param centerFrequency of the incoming polyphase channel(s) - * @throws FilterDesignException if a channel low pass filter can't be designed to the channel specification + * @throws IllegalArgumentException if a channel low pass filter can't be designed to the channel specification */ - public PolyphaseChannelSource(TunerChannel tunerChannel, IPolyphaseChannelOutputProcessor outputProcessor, - Listener producerSourceEventListener, double channelSampleRate, - long centerFrequency) throws FilterDesignException + public PolyphaseChannelSource(TunerChannel tunerChannel, ChannelCalculator channelCalculator, SynthesisFilterManager filterManager, + Listener producerSourceEventListener) throws IllegalArgumentException { super(producerSourceEventListener, tunerChannel); - mPolyphaseChannelOutputProcessor = outputProcessor; - mPolyphaseChannelOutputProcessor.setListener(this); - mChannelSampleRate = channelSampleRate; + mChannelSampleRate = channelCalculator.getChannelSampleRate(); mStreamHeartbeatProcessor = new StreamProcessorWithHeartbeat<>(getHeartbeatManager(), HEARTBEAT_INTERVAL_MS); - setFrequency(centerFrequency); + updateOutputProcessor(channelCalculator, filterManager); } @Override @@ -109,58 +109,101 @@ public void receive(ComplexSamples complexSamples) } /** - * Channel output processor used by this channel source to convert polyphase channel results into a specific - * channel complex buffer output stream. + * Updates the output processor whenever the source tuner's center frequency changes. + * @param channelCalculator providing access to updated tuner center frequency, sample rate, etc. + * @param filterManager for designing and caching synthesis filters */ - public IPolyphaseChannelOutputProcessor getPolyphaseChannelOutputProcessor() + public void updateOutputProcessor(ChannelCalculator channelCalculator, SynthesisFilterManager filterManager) throws IllegalArgumentException { - return mPolyphaseChannelOutputProcessor; - } + String errorMessage = null; - /** - * Sets/updates the output processor for this channel source, replacing the existing output processor. - * - * @param outputProcessor to use - * @param frequency center for the channels processed by the output processor - */ - public void setPolyphaseChannelOutputProcessor(IPolyphaseChannelOutputProcessor outputProcessor, long frequency) - { - //If this is the first time, simply assign the output processor - if(mPolyphaseChannelOutputProcessor == null) + mOutputProcessorLock.lock(); + + try { - mPolyphaseChannelOutputProcessor = outputProcessor; - mPolyphaseChannelOutputProcessor.setListener(this); + //If a change in sample rate or center frequency makes this channel no longer viable, then the channel + //calculator will throw an IllegalArgException ... handled below + List indexes = channelCalculator.getChannelIndexes(getTunerChannel()); + + //The provided channels are necessarily aligned to the center frequency that this source is providing and an + //oscillator will mix the provided channels to bring the desired center frequency to baseband. + setFrequency(channelCalculator.getCenterFrequencyForIndexes(indexes)); + + if(mPolyphaseChannelOutputProcessor != null && mPolyphaseChannelOutputProcessor.getInputChannelCount() == indexes.size()) + { + mPolyphaseChannelOutputProcessor.setPolyphaseChannelIndices(indexes); + + if(indexes.size() > 1) + { + try + { + float[] filter = filterManager.getFilter(channelCalculator.getSampleRate(), + channelCalculator.getChannelBandwidth(), indexes.size()); + mPolyphaseChannelOutputProcessor.setSynthesisFilter(filter); + } + catch(FilterDesignException fde) + { + mLog.error("Error creating an updated synthesis filter for the channel output processor"); + errorMessage ="Cannot update output processor - unable to design synthesis filter for [" + + indexes.size() + "] channel indices"; + } + } + + mPolyphaseChannelOutputProcessor.setFrequencyOffset(getFrequencyOffset()); + } + else //Create a new output processor. + { + if(mPolyphaseChannelOutputProcessor != null) + { + mPolyphaseChannelOutputProcessor.setListener(null); + } + + mPolyphaseChannelOutputProcessor = null; + + switch(indexes.size()) + { + case 1: + mPolyphaseChannelOutputProcessor = new OneChannelOutputProcessor(channelCalculator.getChannelSampleRate(), + indexes, channelCalculator.getChannelCount()); + mPolyphaseChannelOutputProcessor.setListener(this); + mPolyphaseChannelOutputProcessor.setFrequencyOffset(getFrequencyOffset()); + mPolyphaseChannelOutputProcessor.start(); + break; + case 2: + try + { + float[] filter = filterManager.getFilter(channelCalculator.getChannelSampleRate(), + channelCalculator.getChannelBandwidth(), 2); + mPolyphaseChannelOutputProcessor = new TwoChannelOutputProcessor(channelCalculator.getChannelSampleRate(), + indexes, filter, channelCalculator.getChannelCount()); + mPolyphaseChannelOutputProcessor.setListener(this); + mPolyphaseChannelOutputProcessor.setFrequencyOffset(getFrequencyOffset()); + mPolyphaseChannelOutputProcessor.start(); + } + catch(FilterDesignException fde) + { + errorMessage = "Cannot create new output processor - unable to design synthesis filter for [" + + indexes.size() + "] channel indices"; + mLog.error("Error creating a synthesis filter for a new channel output processor"); + } + break; + default: + mLog.error("Request to create an output processor for unexpected channel index size:" + indexes.size()); + mLog.info(channelCalculator.toString()); + errorMessage = "Unable to create new channel output processor - unexpected channel index size: " + + indexes.size(); + } + } } - //Otherwise, we have to swap out the processor on the sample processing thread - else + finally { - mReplacementPolyphaseChannelOutputProcessor = outputProcessor; - mReplacementFrequency = frequency; + mOutputProcessorLock.unlock(); } - } - /** - * Updates the output processor to use the new output processor provided by the external - * polyphase channel manager. This method should only be executed on the sample processing - * thread, within the processChannelResults() method. - */ - private void swapOutputProcessor() - { - if(mReplacementPolyphaseChannelOutputProcessor != null) + //Unlikely, but if we had an error designing a synthesis filter, throw an exception + if(errorMessage != null) { - IPolyphaseChannelOutputProcessor existingProcessor = mPolyphaseChannelOutputProcessor; - existingProcessor.stop(); - existingProcessor.setListener(null); - - //Swap out the processor so that incoming samples can accumulate in the new channel output processor - mPolyphaseChannelOutputProcessor = mReplacementPolyphaseChannelOutputProcessor; - mReplacementPolyphaseChannelOutputProcessor = null; - mPolyphaseChannelOutputProcessor.setListener(this); - mPolyphaseChannelOutputProcessor.start(); - - //Finally, setup the frequency offset for the output processor. - mIndexCenterFrequency = mReplacementFrequency; - mPolyphaseChannelOutputProcessor.setFrequencyOffset(getFrequencyOffset()); + throw new IllegalArgumentException(errorMessage); } } @@ -173,12 +216,19 @@ private void swapOutputProcessor() */ public void receiveChannelResults(List channelResultsList) { - if(mReplacementPolyphaseChannelOutputProcessor != null) + mOutputProcessorLock.lock(); + + try { - swapOutputProcessor(); + if(mPolyphaseChannelOutputProcessor != null) + { + mPolyphaseChannelOutputProcessor.receiveChannelResults(channelResultsList); + } + } + finally + { + mOutputProcessorLock.unlock(); } - - mPolyphaseChannelOutputProcessor.receiveChannelResults(channelResultsList); } /** @@ -226,7 +276,6 @@ public long getChannelFrequencyCorrection() */ protected void setChannelFrequencyCorrection(long value) { -//TODO: push this frequency correction down to the output processor ... mChannelFrequencyCorrection = value; updateFrequencyOffset(); broadcastConsumerSourceEvent(SourceEvent.frequencyCorrectionChange(mChannelFrequencyCorrection)); @@ -262,7 +311,10 @@ private long getFrequencyOffset() */ private void updateFrequencyOffset() { - mPolyphaseChannelOutputProcessor.setFrequencyOffset(getFrequencyOffset()); + if(mPolyphaseChannelOutputProcessor != null) + { + mPolyphaseChannelOutputProcessor.setFrequencyOffset(getFrequencyOffset()); + } } @Override diff --git a/src/main/java/io/github/dsheirer/dsp/filter/channelizer/SynthesisFilterManager.java b/src/main/java/io/github/dsheirer/dsp/filter/channelizer/SynthesisFilterManager.java new file mode 100644 index 000000000..eb8a81f20 --- /dev/null +++ b/src/main/java/io/github/dsheirer/dsp/filter/channelizer/SynthesisFilterManager.java @@ -0,0 +1,57 @@ +/* + * ***************************************************************************** + * Copyright (C) 2014-2022 Dennis Sheirer + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see + * **************************************************************************** + */ + +package io.github.dsheirer.dsp.filter.channelizer; + +import io.github.dsheirer.dsp.filter.FilterFactory; +import io.github.dsheirer.dsp.filter.design.FilterDesignException; +import java.util.HashMap; +import java.util.Map; + +/** + * Creates and caches channel output processor synthesis filters. + */ +public class SynthesisFilterManager +{ + private static final String SEPARATOR = "-"; + private Map mFilterMap = new HashMap<>(); + + /** + * Design or retrieve a previously cached output processor synthesis filter. + * @param sampleRate of the tuner + * @param channelBandwidth per channel + * @param channelCount as the number of channels being synthesized/aggregated for the output processor (1 or 2) + * @return filter + * @throws FilterDesignException if the filter cannot be designed based on the supplied parameters. + */ + public float[] getFilter(double sampleRate, double channelBandwidth, int channelCount) throws FilterDesignException + { + String key = sampleRate + SEPARATOR + channelBandwidth + SEPARATOR + channelCount; + + if(mFilterMap.containsKey(key)) + { + return mFilterMap.get(key); + } + + float[] taps = FilterFactory.getSincM2Synthesizer(sampleRate, channelBandwidth, channelCount, + PolyphaseChannelManager.POLYPHASE_SYNTHESIZER_TAPS_PER_CHANNEL); + mFilterMap.put(key, taps); + return taps; + } +} diff --git a/src/main/java/io/github/dsheirer/dsp/filter/channelizer/output/IPolyphaseChannelOutputProcessor.java b/src/main/java/io/github/dsheirer/dsp/filter/channelizer/output/IPolyphaseChannelOutputProcessor.java index 286f6c17c..ec3474516 100644 --- a/src/main/java/io/github/dsheirer/dsp/filter/channelizer/output/IPolyphaseChannelOutputProcessor.java +++ b/src/main/java/io/github/dsheirer/dsp/filter/channelizer/output/IPolyphaseChannelOutputProcessor.java @@ -20,7 +20,6 @@ import io.github.dsheirer.sample.Listener; import io.github.dsheirer.sample.complex.ComplexSamples; - import java.util.List; public interface IPolyphaseChannelOutputProcessor @@ -45,11 +44,6 @@ public interface IPolyphaseChannelOutputProcessor */ void setListener(Listener listener); -// /** -// * Process the channel output channel results queue and deliver the output to the listener -// */ -// void processChannelResults(); - /** * Sets the desired frequency offset from center. The samples will be mixed with an oscillator set to this offset * frequency to produce an output where the desired signal is centered in the passband. diff --git a/src/main/java/io/github/dsheirer/dsp/filter/channelizer/output/OneChannelOutputProcessor.java b/src/main/java/io/github/dsheirer/dsp/filter/channelizer/output/OneChannelOutputProcessor.java index 4fe01f014..f6bdaaa1e 100644 --- a/src/main/java/io/github/dsheirer/dsp/filter/channelizer/output/OneChannelOutputProcessor.java +++ b/src/main/java/io/github/dsheirer/dsp/filter/channelizer/output/OneChannelOutputProcessor.java @@ -19,11 +19,10 @@ package io.github.dsheirer.dsp.filter.channelizer.output; import io.github.dsheirer.sample.complex.ComplexSamples; +import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; - public class OneChannelOutputProcessor extends ChannelOutputProcessor { private final static Logger mLog = LoggerFactory.getLogger(OneChannelOutputProcessor.class); @@ -80,7 +79,7 @@ public void setFrequencyOffset(long frequency) * Extract the channel from the channel results array and pass to the assembler. The assembler will * apply frequency translation and gain and indicate when a buffer is fully assembled. * - * @param lists to process containing a list of a list of channel array of I/Q sample pairs (I0,Q0,I1,Q1...In,Qn) + * @param channelResultsList to process containing a list of a list of channel array of I/Q sample pairs (I0,Q0,I1,Q1...In,Qn) */ @Override public void process(List channelResultsList) diff --git a/src/main/java/io/github/dsheirer/sample/complex/ComplexSampleListenerModule.java b/src/main/java/io/github/dsheirer/sample/complex/ComplexSampleListenerModule.java new file mode 100644 index 000000000..42c785a59 --- /dev/null +++ b/src/main/java/io/github/dsheirer/sample/complex/ComplexSampleListenerModule.java @@ -0,0 +1,92 @@ +/* + * ***************************************************************************** + * Copyright (C) 2014-2022 Dennis Sheirer + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see + * **************************************************************************** + */ + +package io.github.dsheirer.sample.complex; + +import io.github.dsheirer.buffer.INativeBuffer; +import io.github.dsheirer.module.Module; +import io.github.dsheirer.sample.Broadcaster; +import io.github.dsheirer.sample.Listener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Module that receives and rebroadcasts complex samples. + */ +public class ComplexSampleListenerModule extends Module implements IComplexSamplesListener, Listener +{ + private static final Logger mLog = LoggerFactory.getLogger(ComplexSampleListenerModule.class); + private Broadcaster mBroadcaster = new Broadcaster<>(); + + /** + * Implements the interface + * @return this. + */ + @Override + public Listener getComplexSamplesListener() + { + return this; + } + + /** + * Primary receive method for sample buffers to rebroadcast. + * @param complexSamples to rebroadcast + */ + @Override + public void receive(ComplexSamples complexSamples) + { + mBroadcaster.broadcast(new ComplexSamplesNativeBufferAdapter(complexSamples)); + } + + /** + * Adds listener to receive rebroadcast of complex samples buffer. + * @param listener to add + */ + public void addListener(Listener listener) + { + mBroadcaster.addListener(listener); + } + + /** + * Removes the listener from receiving rebroadcast of complex samples. + * @param listener to remove + */ + public void removeListener(Listener listener) + { + mBroadcaster.removeListener(listener); + } + + @Override + public void reset() + { + //Not implemented + } + + @Override + public void start() + { + //Not implemented + } + + @Override + public void stop() + { + //Not implemented + } +} diff --git a/src/main/java/io/github/dsheirer/sample/complex/ComplexSamplesNativeBufferAdapter.java b/src/main/java/io/github/dsheirer/sample/complex/ComplexSamplesNativeBufferAdapter.java new file mode 100644 index 000000000..5597c317d --- /dev/null +++ b/src/main/java/io/github/dsheirer/sample/complex/ComplexSamplesNativeBufferAdapter.java @@ -0,0 +1,107 @@ +/* + * ***************************************************************************** + * Copyright (C) 2014-2022 Dennis Sheirer + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see + * **************************************************************************** + */ + +package io.github.dsheirer.sample.complex; + +import io.github.dsheirer.buffer.INativeBuffer; +import java.util.Iterator; + +/** + * Adapts a single complex samples buffer to be compatible as an INativeBuffer instance. + */ +public class ComplexSamplesNativeBufferAdapter implements INativeBuffer +{ + private ComplexSamples mComplexSamples; + private long mTimestamp = System.currentTimeMillis(); + + /** + * Constructs an instance + * @param complexSamples to adapt/convert to an INativeBuffer + */ + public ComplexSamplesNativeBufferAdapter(ComplexSamples complexSamples) + { + mComplexSamples = complexSamples; + } + + @Override + public Iterator iterator() + { + return new ComplexSamplesIterator(); + } + + @Override + public Iterator iteratorInterleaved() + { + return new InterleavedComplexSamplesIterator(); + } + + @Override + public int sampleCount() + { + return mComplexSamples.i().length; + } + + @Override + public long getTimestamp() + { + return mTimestamp; + } + + /** + * Simple iterator implementation. + */ + public class ComplexSamplesIterator implements Iterator + { + public boolean mHasNext = true; + + @Override + public boolean hasNext() + { + return mHasNext; + } + + @Override + public ComplexSamples next() + { + mHasNext = false; + return mComplexSamples; + } + } + + /** + * Simple iterator implementation. + */ + public class InterleavedComplexSamplesIterator implements Iterator + { + public boolean mHasNext = true; + + @Override + public boolean hasNext() + { + return mHasNext; + } + + @Override + public InterleavedComplexSamples next() + { + mHasNext = false; + return mComplexSamples.toInterleaved(); + } + } +} diff --git a/src/main/java/io/github/dsheirer/util/Dispatcher.java b/src/main/java/io/github/dsheirer/util/Dispatcher.java index 0489d7a71..31e014b8b 100644 --- a/src/main/java/io/github/dsheirer/util/Dispatcher.java +++ b/src/main/java/io/github/dsheirer/util/Dispatcher.java @@ -156,40 +156,47 @@ class Processor implements Runnable @Override public void run() { - mQueue.clear(); + try + { + mQueue.clear(); - E element; + E element; - while(mRunning.get()) - { - try + while(mRunning.get()) { - element = mQueue.take(); + try + { + element = mQueue.take(); - if(mPoisonPill == element) + if(mPoisonPill == element) + { + mRunning.set(false); + } + else if(element != null) + { + if(mListener == null) + { + throw new IllegalStateException("Listener for [" + mThreadName + "] is null"); + } + mListener.receive(element); + } + } + catch(InterruptedException e) { - mRunning.set(false); + mLog.error("Buffer processor thread was interrupted"); } - else if(element != null) + catch(Exception e) { - if(mListener == null) - { - throw new IllegalStateException("Listener for [" + mThreadName + "] is null"); - } - mListener.receive(element); + mLog.error("Error while processing element", e); } } - catch(InterruptedException e) - { - mLog.error("Buffer processor thread was interrupted"); - } - catch(Exception e) - { - mLog.error("Error while processing element", e); - } - } - mQueue.clear(); + mQueue.clear(); + } + catch(Throwable t) + { + mLog.error("Unexpected error thrown from the Dispatcher thread", t); + } } } }