Skip to content
This repository has been archived by the owner on Jul 7, 2020. It is now read-only.

Alias #267

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,13 @@ public interface AliasManager {

void addAlias(String alias, List<String> jobs);

void deleteAlias(String alias);
void deleteAlias(String alias) throws Exception;

List<String> aliasToJobs(String alias);

List<String> getJobs(String alias);

String getLikelyAlias(String jobid);

void putAlias(String alias, List<String> jobs);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,35 +13,83 @@
*/
package com.addthis.hydra.job.alias;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

import java.io.IOException;
import java.io.StringWriter;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.ReentrantLock;

import com.addthis.hydra.job.store.DataStoreUtil;
import com.addthis.hydra.job.store.SpawnDataStore;
import com.addthis.hydra.query.spawndatastore.AliasBiMap;
import com.addthis.hydra.query.spawndatastore.AliasCache;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class AliasManagerImpl implements AliasManager {

private static final Logger log = LoggerFactory.getLogger(AliasManagerImpl.class);
private Map<String, String> aliases;
/* This SpawnDataStore must be the same type (zookeeper/priam) between Spawn and Mqmaster. This should
* be guaranteed by the implementation of DataStoreUtil. */
public static final String ALIAS_PATH = "/query/alias";
private final SpawnDataStore spawnDataStore;
private final Map<String, List<String>> alias2jobs;
private final Map<String, String> job2alias;
private final ObjectMapper mapper;
private final ReentrantLock mapLock;
private final AliasCache ac;

private AliasBiMap aliasBiMap;
public AliasManagerImpl() throws Exception{
this.spawnDataStore = DataStoreUtil.makeCanonicalSpawnDataStore();
this.mapLock = new ReentrantLock();
this.mapper = new ObjectMapper();
this.alias2jobs = new HashMap<>();
this.job2alias = new HashMap<>();
this.ac = new AliasCache();
}

public AliasManagerImpl(SpawnDataStore spawnDataStore) {
this.aliasBiMap = new AliasBiMap(spawnDataStore);
aliasBiMap.loadCurrentValues();
public AliasManagerImpl(SpawnDataStore spawnDataStore) throws Exception {
this.spawnDataStore = spawnDataStore;
this.mapLock = new ReentrantLock();
this.mapper = new ObjectMapper();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be declared statically.

this.alias2jobs = new HashMap<>();
this.job2alias = new HashMap<>();
this.ac = new AliasCache();
}

/**
* Returns a map describing alias name => jobIds
*/
public Map<String, List<String>> getAliases() {
return aliasBiMap.viewAliasMap();
aliases = spawnDataStore.getAllChildren(ALIAS_PATH);
Map<String, List<String>> alias2Jobs = new HashMap<>();
for (Map.Entry<String, String> aliasEntry : aliases.entrySet()) {
String key = aliasEntry.getKey();
List<String> jobs = decodeAliases(aliasEntry.getValue());
alias2Jobs.put(key, jobs);
}
return alias2Jobs;
}

public List<String> aliasToJobs(String alias) {
return aliasBiMap.getJobs(alias);
return getJobs(alias);
}

/**
Expand All @@ -51,14 +99,161 @@ public List<String> aliasToJobs(String alias) {
*/
public void addAlias(String alias, List<String> jobs) {
if (jobs.size() > 0) {
aliasBiMap.putAlias(alias, jobs);
putAlias(alias, jobs);
} else {
log.warn("Ignoring empty jobs addition for alias: {}", alias);
}
}

public void deleteAlias(String alias) {
aliasBiMap.deleteAlias(alias);
public void putAlias(String alias, List<String> jobs) {
mapLock.lock();
try {
alias2jobs.put(alias, jobs);
job2alias.put(jobs.get(0), alias);
StringWriter sw = new StringWriter();
spawnDataStore.putAsChild(ALIAS_PATH, alias, mapper.writeValueAsString(jobs));
} catch (Exception e) {
log.warn("failed to put alias: {}", alias, e);
throw Throwables.propagate(e);
} finally {
mapLock.unlock();
}
}

/**
* Get all jobIds for a given alias
*
* @param alias The alias to check
* @return A list of jobIds, possible null
*/
public List<String> getJobs(String alias) {
refreshAlias(alias);
mapLock.lock();
try {
return alias2jobs.get(alias);
} finally {
mapLock.unlock();
}
}

/**
* Refresh an alias based on datastore
*
* @param alias The alias to refresh
*/
private void refreshAlias(String alias) {
try {
updateAlias(alias, this.spawnDataStore.getChild(ALIAS_PATH, alias));
} catch (ExecutionException e) {
log.error("Failed to refresh alias: {}", alias, e);
} catch (Exception e) {
log.error("Unexpected error while refreshing alias {}", alias, e);
}
}

/**
* If jobs is available, update jobs in two maps based on the given alias
* Otherwise, delete the job for a given alias from two maps
*
* @param alias The alias key to check
* @param jobs The jobs to be updated for alias key
* @return String The job that was updated
*/
@Nullable private String updateAlias(String alias, @Nullable String jobs) throws Exception {
if (Strings.isNullOrEmpty(alias)) {
log.warn("Ignoring alias {} since it is null or empty ", alias);
return jobs;
}
if (Strings.isNullOrEmpty(jobs)) {
log.warn("Ignoring alias {} since there are no jobs and delete alias from two maps", alias);
deleteAlias(alias);
return jobs;
}
List<String> jobList = decodeAliases(jobs);
mapLock.lock();
try {
alias2jobs.put(alias, jobList);
job2alias.put(jobList.get(0), alias);
} finally {
mapLock.unlock();
}
return jobs;
}

@VisibleForTesting
protected List<String> decodeAliases(@Nonnull String data) {
try {
return mapper.readValue(data, new TypeReference<List<String>>() {});
} catch (IOException e) {
log.warn("Failed to decode data", e);
return new ArrayList<>(0);
}
}

/**
* Delete the data for a given alias
*
* @param alias The alias to check
*/
public void deleteAlias(String alias) throws Exception {
spawnDataStore.deleteChild(ALIAS_PATH, alias);
if( !Strings.isNullOrEmpty(spawnDataStore.getChild(ALIAS_PATH, alias))) {
log.error("Fail to delete alias {} from spawn datastore", alias);
return;
}
mapLock.lock();
try {
List<String> jobs = alias2jobs.get(alias);
alias2jobs.remove(alias);
if ((jobs != null) && !jobs.isEmpty()) {
for (String job : jobs) {
String aliasVal = job2alias.get(job);
if (Objects.equals(aliasVal, alias)) {
job2alias.remove(job);
}
}
}
} finally {
mapLock.unlock();
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't you copy the maps first? What happens if the deleteChild call below fails? You now have a bad state.

ac.deleteAlias(alias);
}

/**
* Get an alias for a particular jobId
*
* @param jobid The jobId to check
* @return One of the aliases for that job
*/
public String getLikelyAlias(String jobid) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why likely alias? Wouldn't this be the alias?

mapLock.lock();
try {
String tmpAlias = job2alias.get(jobid);
if (tmpAlias != null) {
// Check to see if the alias has been deleted
checkAlias(jobid, tmpAlias);
}
return job2alias.get(jobid);
} finally {
mapLock.unlock();
}
}

/**
* Test a job/alias pair to see if an alias has disappeared
*
* @param job The job to test
* @param alias The alias to check
*/
private void checkAlias(String job, String alias) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method also has unclear side effects since it removes job from job2alias when the method name is called checkAlias

mapLock.lock();
try {
if (!alias2jobs.containsKey(alias) && job2alias.get(job).equals(alias)) {
job2alias.remove(job);
}
} finally {
mapLock.unlock();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,23 @@
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableFutureTask;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import org.slf4j.Logger;

/**
* A cache implementation that never blocks unless there is no data for a given ID. Stale values are refreshed asynchronously
* and the old value is returned in the mean time.
*
* @param <T> The class that will be stored in the cache
*/
public abstract class AvailableCache<T> implements AutoCloseable {
private static final Logger log = org.slf4j.LoggerFactory.getLogger(AvailableCache.class);

/* A LoadingCache used to save fetched objects */
private final LoadingCache<String, Optional<T>> loadingCache;
Expand Down Expand Up @@ -67,17 +72,24 @@ public AvailableCache(long refreshMillis, long expireMillis, int maxSize, int fe
if (fetchThreads <= 0) {
fetchThreads = 2;
}
cacheBuilder.removalListener(new RemovalListener<Optional<T>, Optional<T>>() {
@Override
public void onRemoval(RemovalNotification<Optional<T>, Optional<T>> notification) {
log.info("alias {} and its job {} removed from cache", notification.getKey(), notification.getValue());
}
});
executor = new ThreadPoolExecutor(
fetchThreads, fetchThreads, 1000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(),
new ThreadFactoryBuilder().setNameFormat("avail-cache-%d").setDaemon(true).build());
//noinspection unchecked

this.loadingCache = cacheBuilder.build(new CacheLoader<String, Optional<T>>() {
/**
* If refreshAfterWrite is enabled, this method is called after returning the old value.
* The new value will be inserted into the cache when the load() operation completes.
*/
@Override
public ListenableFuture<Optional<T>> reload(final String key, Optional<T> oldValue) {
public ListenableFuture<Optional<T>> reload(final String key, Optional<T> oldValue) throws Exception {
ListenableFutureTask<Optional<T>> task = ListenableFutureTask.create(() -> load(key));
executor.execute(task);
return task;
Expand All @@ -90,6 +102,10 @@ public Optional<T> load(String key) throws Exception {
});
}

public LoadingCache<String, Optional<T>> getLoadingCache() {
return loadingCache;
}

/**
* A possibly-lengthy operation to fetch the canonical value for a given id, such as by reading from a SpawnDataStore
*
Expand Down Expand Up @@ -118,6 +134,10 @@ public void clear() {
loadingCache.invalidateAll();
}

public void cleanUp() {
loadingCache.cleanUp();
}

@Override public void close() throws Exception {
MoreExecutors.shutdownAndAwaitTermination(executor, 120, TimeUnit.SECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

import com.addthis.basis.util.Parameter;

import com.addthis.hydra.query.spawndatastore.AliasBiMap;
import com.addthis.hydra.job.alias.AliasManagerImpl;

import com.google.common.collect.Lists;

Expand All @@ -50,7 +50,7 @@ public class DataStoreUtil {
/* A list of datastore paths with values that should be cutover */
private static final List<String> pathsToImport = Arrays.asList(SPAWN_QUEUE_PATH, SPAWN_BALANCE_PARAM_PATH, SPAWN_HOST_FAIL_WORKER_PATH);
/* A list of datastore parent nodes with children that should be cutover */
private static final List<String> parentsToImport = Arrays.asList(SPAWN_COMMON_COMMAND_PATH, SPAWN_COMMON_MACRO_PATH, SPAWN_JOB_CONFIG_PATH, AliasBiMap.ALIAS_PATH, SPAWN_COMMON_ALERT_PATH);
private static final List<String> parentsToImport = Arrays.asList(SPAWN_COMMON_COMMAND_PATH, SPAWN_COMMON_MACRO_PATH, SPAWN_JOB_CONFIG_PATH, AliasManagerImpl.ALIAS_PATH, SPAWN_COMMON_ALERT_PATH);
/* A list of nodes beneath each job node */
private static final List<String> jobParametersToImport = Arrays.asList("config", "queryconfig", "tasks", "alerts");
/* A list of properties of certain job nodes that should be imported as flat values rather than children -- necessary for certain kafka broker info */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ protected void writeQuery(ChannelHandlerContext ctx, Query query, ChannelPromise
}
}

private List<String> expandAlias(String jobId) {
private List<String> expandAlias(String jobId) throws ExecutionException {
if (spawnDataStoreHandler != null) {
return spawnDataStoreHandler.expandAlias(jobId);
} else {
Expand Down
Loading