Skip to content

Commit

Permalink
master: rename AgentInfo to MasterAgentInfo
Browse files Browse the repository at this point in the history
Preparation for making agent querying functions public.
  • Loading branch information
vs49688 committed Dec 3, 2020
1 parent d6c904c commit 7b483fc
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ private RunningJob(UUID uuid, Job job, JobAttempt att, NetworkJob networkJob, Ag
private final LinkedBlockingDeque<_AgentMessage> agentMessages;

private final AAAAA aaaaa;
private final ConcurrentHashMap<UUID, AgentInfo> allAgents;
private final ConcurrentHashMap<UUID, MasterAgentInfo> allAgents;
private final Orphanage orphanage;

private final _AgentListener agentListener;
Expand Down Expand Up @@ -243,7 +243,7 @@ public Optional<MessageOperation> processAgentMessage(long tag, AMQPMessage amsg
return Optional.of(MessageOperation.Reject);
}

AgentInfo ai = allAgents.get(amsg.message.getAgentUUID());
MasterAgentInfo ai = allAgents.get(amsg.message.getAgentUUID());
if(ai == null) {
/* Not anyone we know. */
return Optional.of(MessageOperation.Reject);
Expand Down Expand Up @@ -305,7 +305,7 @@ private void runLater(String name, Runnable r) {
runLater(name, r, false);
}

private AgentInfo getAgentInfo(UUID uuid) {
private MasterAgentInfo getMasterAgentInfo(UUID uuid) {
return allAgents.getOrDefault(uuid, null);
}

Expand Down Expand Up @@ -351,8 +351,8 @@ public boolean tick() {
/*
* Register an agent with the master, creating its state machine et al.
*/
private AgentInfo registerAgent(DefaultAgentState as, Resource res, Optional<Actuator> act, boolean initial) {
AgentInfo ai = new AgentInfo(as.getUUID(), res, act, new ReferenceAgent(as, agentListener), as);
private MasterAgentInfo registerAgent(DefaultAgentState as, Resource res, Optional<Actuator> act, boolean initial) {
MasterAgentInfo ai = new MasterAgentInfo(as.getUUID(), res, act, new ReferenceAgent(as, agentListener), as);

allAgents.put(ai.uuid, ai);

Expand All @@ -375,7 +375,7 @@ private void checkOrphanage() {
for(Resource r : agentMap.keySet()) {
CompletableFuture<Actuator> af = aaaaa.getOrLaunchActuator(r);
for(DefaultAgentState as : agentMap.get(r)) {
AgentInfo ai = registerAgent(as, r, Optional.empty(), false);
MasterAgentInfo ai = registerAgent(as, r, Optional.empty(), false);
heart.onAgentCreate(as.getUUID(), Instant.now());
heart.resetPingTimer(as.getUUID());
af.handle((a, t) -> {
Expand Down Expand Up @@ -404,7 +404,7 @@ private void checkOrphanage() {
}
}

private NetworkJob buildNetworkJob(JobAttempt att, Job job, AgentInfo ai) {
private NetworkJob buildNetworkJob(JobAttempt att, Job job, MasterAgentInfo ai) {
/* FIXME: handle cert path, etc. */
return nimrod.getAssignmentStatus(ai.resource, experiment)
.map(u -> MsgUtils.resolveJob(att.getUUID(), job, Task.Name.Main, u.uri))
Expand All @@ -423,7 +423,7 @@ private void resyncJobAttempts() {
activeAttempts.forEach((job, value) -> value.stream()
.filter(att -> att.getStatus() == JobAttempt.Status.RUNNING && !runningJobs.containsKey(att.getAgentUUID()))
.forEach(att -> {
AgentInfo ai = allAgents.get(att.getAgentUUID());
MasterAgentInfo ai = allAgents.get(att.getAgentUUID());
/* If there's no agent associated, something screwy's going on. Fail the attempt and let it reschedule. */
if(ai == null) {
/* FIXME: What if the job scheduler actually knows about it and somethings really gone screwy? */
Expand Down Expand Up @@ -536,7 +536,7 @@ private void processAgents(State state) {
heart.tick(Instant.now());

{
List<AgentInfo> ais = heartOps.toExpire.stream()
List<MasterAgentInfo> ais = heartOps.toExpire.stream()
.map(allAgents::remove)
.filter(Objects::nonNull)
.collect(Collectors.toList());
Expand All @@ -551,7 +551,7 @@ private void processAgents(State state) {
}

/* Expire an agent. */
private void doExpire(AgentInfo ai) {
private void doExpire(MasterAgentInfo ai) {
if(ai.state.getState() == Agent.State.WAITING_FOR_HELLO) {
/*
* We're still WAITING_FOR_HELLO, ask the actuator what's going on.
Expand Down Expand Up @@ -601,7 +601,7 @@ private MessageOperation doProcessAgentMessage2(_AgentMessage _msg) throws Illeg
AgentMessage msg = _msg.msg;
LOGGER.debug("doProcessAgentMessage({}, {})", msg.getAgentUUID(), msg.getType().typeString);

AgentInfo ai = getAgentInfo(msg.getAgentUUID());
MasterAgentInfo ai = getMasterAgentInfo(msg.getAgentUUID());
if(ai == null) {
LOGGER.warn("Message from unknown agent {}, ignoring...", msg.getAgentUUID());
return MessageOperation.Ack;
Expand Down Expand Up @@ -639,7 +639,7 @@ private MessageOperation doProcessAgentMessage2(_AgentMessage _msg) throws Illeg
private State stoppingProc(State state, Mode mode) {
if(mode == Mode.Enter) {
aaaaa.shutdown();
for(AgentInfo ai : allAgents.values()) {
for(MasterAgentInfo ai : allAgents.values()) {
try {
LOGGER.trace("Terminating agent '{}'", ai.uuid);
ai.instance.terminate();
Expand Down Expand Up @@ -744,10 +744,10 @@ public void recordCommandResult(JobAttempt att, CommandResult.CommandResultStatu
* If an agent has launched successfully and the actuator hasn't, the agent is orphaned.
* If an agent has launched successfully and the actuator reports a launch error, the agent is killed.
*
* @param agentInfo An array of {@link AgentInfo} structures.
* @param masterAgentInfo An array of {@link MasterAgentInfo} structures.
* @param launchRequest The launch request.
*/
private void launchHandler(AgentInfo[] agentInfo, LaunchRequest launchRequest) {
private void launchHandler(MasterAgentInfo[] masterAgentInfo, LaunchRequest launchRequest) {
assert launchRequest.actuatorFuture.isDone();

/* NB: Submitted by a child of this, so this won't fail. */
Expand All @@ -762,7 +762,7 @@ private void launchHandler(AgentInfo[] agentInfo, LaunchRequest launchRequest) {

for(int i = 0; i < lrs.length; ++i) {
Actuator.LaunchResult lr = lrs[i];
AgentInfo ai = agentInfo[i];
MasterAgentInfo ai = masterAgentInfo[i];
ai.actuator.complete(act);

if(lr.t != null) {
Expand Down Expand Up @@ -810,24 +810,24 @@ public UUID[] launchAgents(Resource res, int num) {
UUID[] uuids = AAAAA.generateRandomUUIDs(num);

/* Create the agents before doing anything, saves us from having to keep a list of pending connections. */
AgentInfo[] agentInfo = new AgentInfo[num];
MasterAgentInfo[] masterAgentInfo = new MasterAgentInfo[num];
for(int i = 0; i < num; ++i) {
DefaultAgentState as = new DefaultAgentState();
as.setUUID(uuids[i]);
/* FIXME: Generate this properly. */
as.setSecretKey(UUID.randomUUID().toString().replace("-", ""));
agentInfo[i] = registerAgent(as, res, Optional.empty(), true);
masterAgentInfo[i] = registerAgent(as, res, Optional.empty(), true);
}

/* NB: Will not block. */
LaunchRequest rq = aaaaa.launchAgents(res, Arrays.stream(agentInfo)
LaunchRequest rq = aaaaa.launchAgents(res, Arrays.stream(masterAgentInfo)
.map(ai -> Actuator.Request.forAgent(ai.uuid, ai.state.getSecretKey()))
.toArray(Actuator.Request[]::new)
);

rq.launchResults.thenAcceptAsync(lrs -> runLater(
"launchAgents->handler",
() -> launchHandler(agentInfo, rq), true), aaaaa.getExecutorService()
() -> launchHandler(masterAgentInfo, rq), true), aaaaa.getExecutorService()
);

return uuids;
Expand Down Expand Up @@ -956,7 +956,7 @@ public void send(Agent agent, AgentMessage.Builder<?> msg) throws IOException {

@Override
public void onStateChange(Agent agent, Agent.State oldState, Agent.State newState) {
AgentInfo ai = getAgentInfo(agent.getUUID());
MasterAgentInfo ai = getMasterAgentInfo(agent.getUUID());
assert agent == ai.instance;

LOGGER.debug("Agent {}: State change from {} -> {}", agent.getUUID(), oldState, newState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

class AgentInfo {
class MasterAgentInfo {

public final UUID uuid;
public final Resource resource;
public final CompletableFuture<Actuator> actuator;
public final ReferenceAgent instance;
public final DefaultAgentState state;

public AgentInfo(UUID uuid, Resource resource, Optional<Actuator> actuator, ReferenceAgent instance, DefaultAgentState state) {
public MasterAgentInfo(UUID uuid, Resource resource, Optional<Actuator> actuator, ReferenceAgent instance, DefaultAgentState state) {
this.uuid = uuid;
this.resource = resource;
this.actuator = new CompletableFuture<>();
Expand All @@ -61,7 +61,7 @@ public boolean equals(Object obj) {
if(getClass() != obj.getClass()) {
return false;
}
final AgentInfo other = (AgentInfo)obj;
final MasterAgentInfo other = (MasterAgentInfo)obj;
return Objects.equals(this.uuid, other.uuid);
}

Expand Down

0 comments on commit 7b483fc

Please sign in to comment.