Skip to content

Commit

Permalink
Showing 5 changed files with 197 additions and 3 deletions.
21 changes: 18 additions & 3 deletions aeron-client/src/main/cpp/Aeron.cpp
Original file line number Diff line number Diff line change
@@ -58,14 +58,29 @@ Aeron::Aeron(Context &context) :
CncFileDescriptor::clientLivenessTimeout(m_cncBuffer),
context.m_publicationConnectionTimeout),
m_idleStrategy(IDLE_SLEEP_MS),
m_conductorRunner(m_conductor, m_idleStrategy, m_context.m_exceptionHandler)
m_conductorRunner(m_conductor, m_idleStrategy, m_context.m_exceptionHandler),
m_conductorInvoker(m_conductor, m_context.m_exceptionHandler)
{
m_conductorRunner.start();
if (m_context.m_useConductorAgentInvoker)
{
m_conductorInvoker.start();
}
else
{
m_conductorRunner.start();
}
}

Aeron::~Aeron()
{
m_conductorRunner.close();
if (m_context.m_useConductorAgentInvoker)
{
m_conductorInvoker.close();
}
else
{
m_conductorRunner.close();
}

// memory mapped files should be free'd by the destructor of the shared_ptr
}
12 changes: 12 additions & 0 deletions aeron-client/src/main/cpp/Aeron.h
Original file line number Diff line number Diff line change
@@ -27,6 +27,7 @@
#include "ClientConductor.h"
#include "concurrent/SleepingIdleStrategy.h"
#include "concurrent/AgentRunner.h"
#include "concurrent/AgentInvoker.h"
#include "Publication.h"
#include "Subscription.h"
#include "Context.h"
@@ -222,6 +223,16 @@ class Aeron
return m_toDriverRingBuffer.nextCorrelationId();
}

/**
* Return the AgentInvoker for the client conductor.
*
* @return AgenInvoker for the conductor.
*/
inline AgentInvoker<ClientConductor>& conductorAgentInvoker()
{
return m_conductorInvoker;
}

private:
std::random_device m_randomDevice;
std::default_random_engine m_randomEngine;
@@ -244,6 +255,7 @@ class Aeron
ClientConductor m_conductor;
SleepingIdleStrategy m_idleStrategy;
AgentRunner<ClientConductor, SleepingIdleStrategy> m_conductorRunner;
AgentInvoker<ClientConductor> m_conductorInvoker;

MemoryMappedFile::ptr_t mapCncFile(Context& context);
};
1 change: 1 addition & 0 deletions aeron-client/src/main/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -54,6 +54,7 @@ SET(HEADERS
command/SubscriptionMessageFlyweight.h
command/DestinationMessageFlyweight.h
concurrent/AgentRunner.h
concurrent/AgentInvoker.h
concurrent/Atomic64.h
concurrent/AtomicBuffer.h
concurrent/AtomicCounter.h
13 changes: 13 additions & 0 deletions aeron-client/src/main/cpp/Context.h
Original file line number Diff line number Diff line change
@@ -284,6 +284,18 @@ class Context
return *this;
}

/**
* Set whether to use an invoker to control the conductor agent or spawn a thread.
*
* @param useConductorAgentInvoker to use an invoker or not.
* @return reference to this Context instance
*/
inline this_t& useConductorAgentInvoker(bool useConductorAgentInvoker)
{
m_useConductorAgentInvoker = useConductorAgentInvoker;
return *this;
}

inline static std::string tmpDir()
{
#if defined(_MSC_VER)
@@ -350,6 +362,7 @@ class Context
long m_mediaDriverTimeout = NULL_TIMEOUT;
long m_resourceLingerTimeout = NULL_TIMEOUT;
long m_publicationConnectionTimeout = NULL_TIMEOUT;
bool m_useConductorAgentInvoker = false;
};

}
153 changes: 153 additions & 0 deletions aeron-client/src/main/cpp/concurrent/AgentInvoker.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Copyright 2014-2017 Real Logic Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef AERON_AGENTINVOKER_H
#define AERON_AGENTINVOKER_H

#include <util/Exceptions.h>
#include <functional>
#include <thread>
#include <atomic>
#include <concurrent/logbuffer/TermReader.h>

namespace aeron {

namespace concurrent {

template<typename Agent>
class AgentInvoker
{
public:
AgentInvoker(Agent& agent, logbuffer::exception_handler_t& exceptionHandler) :
m_agent(agent),
m_exceptionHandler(exceptionHandler),
m_isStarted(false),
m_isRunning(false),
m_isClosed(false)
{
}

/**
* Has the Agent been started?
*
* @return has the Agent been started?
*/
inline bool isStarted()
{
return m_isStarted;
}

/**
* Is the Agent running?
*
* @return is the Agent been started successfully and not closed?
*/
inline bool isRunning()
{
return m_isRunning;
}

/**
* Has the Agent been closed?
*
* @return has the Agent been closed?
*/
inline bool isClosed()
{
return m_isClosed;
}

/**
* Mark the invoker as started and call the Agent::onStart() method.
* <p>
* Startup logic will only be performed once.
*/
inline void start()
{
try
{
if (!m_isStarted)
{
m_isStarted = true;
m_agent.onStart();
m_isRunning = true;
}
}
catch (const util::SourcedException &exception)
{
m_exceptionHandler(exception);
close();
}
}

/**
* Invoke the Agent::doWork() method and return the work count.
*
* If not successfully started or after closed then this method will return without invoking the {@link Agent}.
*
* @return the work count for the Agent::doWork() method.
*/
inline int invoke()
{
int workCount = 0;

if (m_isRunning)
{
try
{
workCount = m_agent.doWork();
}
catch (const util::SourcedException &exception)
{
m_exceptionHandler(exception);
}
}

return workCount;
}

/**
* Mark the invoker as closed and call the Agent::onClose() logic for clean up.
*
* The clean up logic will only be performed once.
*/
inline void close()
{
try
{
if (!m_isClosed)
{
m_isRunning = false;
m_isClosed = true;
m_agent.onClose();
}
}
catch (const util::SourcedException &exception)
{
m_exceptionHandler(exception);
}
}

private:
Agent& m_agent;
logbuffer::exception_handler_t& m_exceptionHandler;
bool m_isStarted;
bool m_isRunning;
bool m_isClosed;
};

}}

#endif

0 comments on commit 33c1033

Please sign in to comment.