Skip to content

Commit

Permalink
#1939 Duplicate call detector enhancements and unit testing.
Browse files Browse the repository at this point in the history
  • Loading branch information
Dennis Sheirer committed Sep 15, 2024
1 parent 0cee16e commit 7a78b1f
Show file tree
Hide file tree
Showing 5 changed files with 325 additions and 86 deletions.
29 changes: 29 additions & 0 deletions src/main/java/io/github/dsheirer/audio/AudioSegment.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.github.dsheirer.identifier.IdentifierCollection;
import io.github.dsheirer.identifier.IdentifierUpdateNotification;
import io.github.dsheirer.identifier.MutableIdentifierCollection;
import io.github.dsheirer.identifier.encryption.EncryptionKeyIdentifier;
import io.github.dsheirer.sample.Broadcaster;
import io.github.dsheirer.sample.Listener;
import java.util.Collection;
Expand Down Expand Up @@ -68,6 +69,7 @@ public class AudioSegment implements Listener<IdentifierUpdateNotification>
private final static Logger mLog = LoggerFactory.getLogger(AudioSegment.class);
private BooleanProperty mComplete = new SimpleBooleanProperty(false);
private BooleanProperty mDuplicate = new SimpleBooleanProperty(false);
private BooleanProperty mEncrypted = new SimpleBooleanProperty(false);
private BooleanProperty mRecordAudio = new SimpleBooleanProperty(false);
private IntegerProperty mMonitorPriority = new SimpleIntegerProperty(Priority.DEFAULT_PRIORITY);
private ObservableSet<BroadcastChannel> mBroadcastChannels = FXCollections.observableSet(new HashSet<>());
Expand Down Expand Up @@ -129,6 +131,25 @@ public long getDuration()
return (mSampleCount / 8); //8 kHz audio generates 8 samples per millisecond
}

/**
* Indicates if the audio segment contains encrypted audio.
*
* @return encrypted property
*/
public BooleanProperty encryptedProperty()
{
return mEncrypted;
}

/**
* Indicates if this audio segment is encrypted
* @return true if encrypted.
*/
public boolean isEncrypted()
{
return mEncrypted.get();
}

/**
* The complete property is used by the audio segment producer to signal that the segment is complete and no
* additional audio or identifiers will be added to the segment.
Expand Down Expand Up @@ -420,6 +441,14 @@ public void addIdentifier(Identifier identifier)
{
mIdentifierCollection.update(identifier);

/**
* If we have a late-add encryption key, set the encrypted flag to true.
*/
if(identifier instanceof EncryptionKeyIdentifier eki)
{
mEncrypted.set(eki.isEncrypted());
}

List<Alias> aliases = mAliasList.getAliases(identifier);

for(Alias alias: aliases)
Expand Down
178 changes: 129 additions & 49 deletions src/main/java/io/github/dsheirer/audio/DuplicateCallDetector.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.github.dsheirer.identifier.radio.RadioIdentifier;
import io.github.dsheirer.identifier.talkgroup.TalkgroupIdentifier;
import io.github.dsheirer.preference.UserPreferences;
import io.github.dsheirer.preference.duplicate.CallManagementPreference;
import io.github.dsheirer.preference.duplicate.ICallManagementProvider;
import io.github.dsheirer.sample.Listener;
import io.github.dsheirer.util.ThreadPool;
Expand All @@ -39,7 +38,8 @@
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -52,18 +52,41 @@
public class DuplicateCallDetector implements Listener<AudioSegment>
{
private final static Logger mLog = LoggerFactory.getLogger(DuplicateCallDetector.class);
private CallManagementPreference mCallManagementPreference;
private ICallManagementProvider mCallManagementProvider;
private Map<String,SystemDuplicateCallDetector> mDetectorMap = new HashMap();
protected Listener<AudioSegment> mDuplicateCallDetectionListener;

/**
* Constructs an instance.
* @param userPreferences to access the duplicate call detection preferences.
*/
public DuplicateCallDetector(UserPreferences userPreferences)
{
mCallManagementPreference = userPreferences.getDuplicateCallDetectionPreference();
this(userPreferences.getDuplicateCallDetectionPreference());
}

/**
* Constructs an instance.
* @param callManagementProvider to provide call management preferences.
*/
public DuplicateCallDetector(ICallManagementProvider callManagementProvider)
{
mCallManagementProvider = callManagementProvider;
}

/**
* Optional listener to be notified each time an audio segment is flagged as duplicate.
* @param listener to register
*/
public void setDuplicateCallDetectionListener(Listener<AudioSegment> listener)
{
mDuplicateCallDetectionListener = listener;
}

@Override
public void receive(AudioSegment audioSegment)
{
if(mCallManagementPreference.isDuplicateCallDetectionEnabled())
if(mCallManagementProvider.isDuplicateCallDetectionEnabled())
{
Identifier identifier = audioSegment.getIdentifierCollection()
.getIdentifier(IdentifierClass.CONFIGURATION, Form.SYSTEM, Role.ANY);
Expand All @@ -78,7 +101,7 @@ public void receive(AudioSegment audioSegment)

if(detector == null)
{
detector = new SystemDuplicateCallDetector(mCallManagementPreference);
detector = new SystemDuplicateCallDetector(mCallManagementProvider, system);
mDetectorMap.put(system, detector);
}

Expand All @@ -88,51 +111,99 @@ public void receive(AudioSegment audioSegment)
}
}

public static class SystemDuplicateCallDetector
/**
* System level duplicate call detector. Uses a scheduled executor to run every 25 ms to compare all ongoing call
* audio segments to detect duplicates.
*
* All audio segments remain in the queue until they are flagged as complete. While in the queue, each call is
* compared against the others to detect duplicates. Once all calls are either flagged as complete or flagged as
* duplicate and removed, the queue is empty and the monitoring is shutdown until a new audio segment arrives and
* then the monitoring starts again.
*/
public class SystemDuplicateCallDetector
{
private final LinkedTransferQueue<AudioSegment> mAudioSegmentQueue = new LinkedTransferQueue<>();
private final List<AudioSegment> mAudioSegments = new ArrayList<>();
private final AtomicBoolean mMonitoring = new AtomicBoolean();
private ScheduledFuture<?> mProcessorFuture;
private Lock mLock = new ReentrantLock();
private boolean mMonitoring = false;
private final ICallManagementProvider mCallManagementProvider;
private String mSystem;

/**
* Constructs an instance
* @param callManagementProvider to check for duplicate monitoring preferences
*/
public SystemDuplicateCallDetector(ICallManagementProvider callManagementProvider)
public SystemDuplicateCallDetector(ICallManagementProvider callManagementProvider, String system)
{
mCallManagementProvider = callManagementProvider;
mSystem = system;
}

/**
* Adds the audio segment to the monitoring queue.
* @param audioSegment to add
*/
public void add(AudioSegment audioSegment)
{
//Block on audio segment queue so that we don't interfere with monitoring shutdown
synchronized(mAudioSegmentQueue)
mLock.lock();

try
{
mAudioSegmentQueue.add(audioSegment);
startMonitoring();
}
finally
{
mLock.unlock();
}
}

/**
* Starts the call monitoring thread if it's not already running.
*
* Note: this method should only be called from a thread with the lock acquired.
*/
private void startMonitoring()
{
if(mMonitoring.compareAndSet(false, true))
if(!mMonitoring)
{
mProcessorFuture = ThreadPool.SCHEDULED.scheduleAtFixedRate(this::process,
0, 25, TimeUnit.MILLISECONDS);
mMonitoring = true;
}
}

/**
* Stops the call monitoring thread if the audio segements queue is empty.
*
* Note: this should only be called from a separate thread and not from within the scheduled monitoring thread
* because cancelling the scheduled timer from within the process() method would kill the thread without
* releasing the lock, causing a deadlock.
*/
private void stopMonitoring()
{
if(mMonitoring.compareAndSet(true, false))
mLock.lock();

try
{
if(mProcessorFuture != null)
//Recheck the audio segments queue to make sure we didn't slip in another audio segment before we can
//shut down the scheduled monitoring thread.
if(mMonitoring && mAudioSegments.isEmpty())
{
mProcessorFuture.cancel(true);
if(mProcessorFuture != null)
{
mProcessorFuture.cancel(true);
mProcessorFuture = null;
}

mMonitoring = false;
}
}
finally
{
mLock.unlock();
}
}

/**
Expand Down Expand Up @@ -179,49 +250,47 @@ public static boolean isDuplicate(List<Identifier> identifiers1, List<Identifier
{
for(Identifier identifier1: identifiers1)
{
if(identifier1 instanceof TalkgroupIdentifier)
if(identifier1 instanceof TalkgroupIdentifier tgId1)
{
int talkgroup1 = ((TalkgroupIdentifier)identifier1).getValue();
int tg1 = tgId1.getValue();

for(Identifier identifier2: identifiers2)
{
if(identifier2 instanceof TalkgroupIdentifier &&
((TalkgroupIdentifier)identifier2).getValue() == talkgroup1)
if(identifier2 instanceof TalkgroupIdentifier tgId2 && tgId2.getValue() == tg1)
{
return true;
}
else if(identifier2 instanceof PatchGroupIdentifier &&
((PatchGroupIdentifier)identifier2).getValue().getPatchGroup().getValue() == talkgroup1)
else if(identifier2 instanceof PatchGroupIdentifier pgId2 &&
pgId2.getValue().getPatchGroup().getValue() == tg1)
{
return true;
}
}
}
else if(identifier1 instanceof PatchGroupIdentifier)
else if(identifier1 instanceof PatchGroupIdentifier pgId1)
{
int talkgroup1 = ((PatchGroupIdentifier)identifier1).getValue().getPatchGroup().getValue();
int talkgroup1 = pgId1.getValue().getPatchGroup().getValue();

for(Identifier identifier2: identifiers2)
{
if(identifier2 instanceof TalkgroupIdentifier &&
((TalkgroupIdentifier)identifier2).getValue() == talkgroup1)
if(identifier2 instanceof TalkgroupIdentifier tgId2 && tgId2.getValue() == talkgroup1)
{
return true;
}
else if(identifier2 instanceof PatchGroupIdentifier &&
((PatchGroupIdentifier)identifier2).getValue().getPatchGroup().getValue() == talkgroup1)
else if(identifier2 instanceof PatchGroupIdentifier pgId2 &&
pgId2.getValue().getPatchGroup().getValue() == talkgroup1)
{
return true;
}
}
}
else if(identifier1 instanceof RadioIdentifier)
else if(identifier1 instanceof RadioIdentifier raId1)
{
int radio1 = ((RadioIdentifier)identifier1).getValue();
int radio1 = raId1.getValue();

for(Identifier identifier2: identifiers2)
{
if(identifier2 instanceof RadioIdentifier && ((RadioIdentifier)identifier2).getValue() == radio1)
if(identifier2 instanceof RadioIdentifier raId2 && raId2.getValue() == radio1)
{
return true;
}
Expand All @@ -237,6 +306,8 @@ else if(identifier1 instanceof RadioIdentifier)
*/
private void process()
{
mLock.lock();

try
{
//Transfer in newly arrived audio segments
Expand All @@ -254,6 +325,19 @@ private void process()
return complete;
});

//Remove any encrypted audio segments.
mAudioSegments.removeIf(audioSegment -> {
boolean encrypted = audioSegment.isEncrypted();

if(encrypted)
{
audioSegment.decrementConsumerCount();
}


return encrypted;
});

//Only check for duplicates if there is more than one call
if(mAudioSegments.size() > 1)
{
Expand All @@ -264,7 +348,7 @@ private void process()
{
AudioSegment current = mAudioSegments.get(currentIndex);

if(!current.isDuplicate())
if(current.hasAudio() && !current.isDuplicate())
{
int checkIndex = currentIndex + 1;

Expand All @@ -279,6 +363,12 @@ private void process()
toCheck.setDuplicate(true);
toCheck.decrementConsumerCount();
duplicates.add(toCheck);

//Notify optional listener that we flagged the call as duplicate.
if(mDuplicateCallDetectionListener != null)
{
mDuplicateCallDetectionListener.receive(toCheck);
}
}
}

Expand All @@ -292,33 +382,23 @@ private void process()
mAudioSegments.removeAll(duplicates);
}

//Finally, if the audio segment queue is empty, shutdown montitoring until a new segment arrives
//Finally, if the audio segment queue is now empty, shutdown monitoring until a new segment arrives.
//The monitor shutdown method has to be called on a separate thread so that we don't kill our current
// thread and fail to release the lock.
if(mAudioSegments.isEmpty())
{
//Block on the audio segment queue so that we can shutdown before any new segments are added, and
//allow the add(segment) to restart monitoring as soon as needed.
synchronized(mAudioSegmentQueue)
{
if(mAudioSegmentQueue.isEmpty())
{
try
{
stopMonitoring();
}
catch(Exception e)
{
mLog.error("Unexpected error during duplicate audio segment monitoring shutdown", e);
//Do nothing, we got interrupted
}
}
}
ThreadPool.CACHED.submit(this::stopMonitoring);
}
}
catch(Throwable t)
{
mLog.error("Unknown error while processing audio segments for duplicate call detection. Please report " +
"this to the developer.", t);
}
finally
{
mLock.unlock();
}
}
}
}
Loading

0 comments on commit 7a78b1f

Please sign in to comment.