Skip to content

Commit

Permalink
Merge pull request #54 from chenhao7253886/master
Browse files Browse the repository at this point in the history
query and pull load select an backend that does't belong to the clust…
  • Loading branch information
chenhao7253886 authored Aug 29, 2017
2 parents 7c278c9 + eab3457 commit 029c0bf
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 5 deletions.
7 changes: 6 additions & 1 deletion fe/src/com/baidu/palo/load/ExportJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public enum JobState {

private long id;
private long dbId;
private String clusterName;
private long tableId;
private BrokerDesc brokerDesc;
private String exportPath;
Expand Down Expand Up @@ -286,7 +287,7 @@ private List<Coordinator> genCoordinators(List<PlanFragment> fragments, List<Sca
TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits() + i, uuid.getLeastSignificantBits());

Coordinator coord = new Coordinator(
queryId, desc, Lists.newArrayList(fragment), Lists.newArrayList(scanNode));
queryId, desc, Lists.newArrayList(fragment), Lists.newArrayList(scanNode), clusterName);
coords.add(coord);
this.coordList.add(coord);
}
Expand Down Expand Up @@ -355,6 +356,10 @@ public long getId() {
return id;
}

public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}

public long getDbId() {
return dbId;
}
Expand Down
7 changes: 5 additions & 2 deletions fe/src/com/baidu/palo/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ public class Coordinator {
private TResourceInfo tResourceInfo;
private boolean needReport;

private String clusterName;

// Used for query
public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) {
Expand All @@ -180,11 +181,12 @@ public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) {
this.tResourceInfo = new TResourceInfo(context.getUser(),
context.getSessionVariable().getResourceGroup());
this.needReport = context.getSessionVariable().isReportSucc();
this.clusterName = context.getClusterName();
}

// Used for pull load task coordinator
public Coordinator(TUniqueId queryId, DescriptorTable descTable,
List<PlanFragment> fragments, List<ScanNode> scanNodes) {
List<PlanFragment> fragments, List<ScanNode> scanNodes, String cluster) {
this.isBlockQuery = true;
this.queryId = queryId;
this.descTable = descTable.toThrift();
Expand All @@ -194,6 +196,7 @@ public Coordinator(TUniqueId queryId, DescriptorTable descTable,
this.queryGlobals.setNow_string(DATE_FORMAT.format(new Date()));
this.tResourceInfo = new TResourceInfo("", "");
this.needReport = true;
this.clusterName = cluster;
}

public TUniqueId getQueryId() {
Expand Down Expand Up @@ -251,7 +254,7 @@ private void prepare() {
queryProfile.addChild(fragmentProfile.get(i));
}

this.idToBackend = Catalog.getCurrentSystemInfo().getIdToBackend();
this.idToBackend = Catalog.getCurrentSystemInfo().getClusterIdToBackend(clusterName);
if (LOG.isDebugEnabled()) {
LOG.debug("idToBackend size={}", idToBackend.size());
for (Map.Entry<Long, Backend> entry : idToBackend.entrySet()) {
Expand Down
17 changes: 16 additions & 1 deletion fe/src/com/baidu/palo/system/SystemInfoService.java
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,22 @@ public synchronized List<Long> seqChooseBackendIds(int backendNum, boolean needA
public ImmutableMap<Long, Backend> getIdToBackend() {
return idToBackendRef.get();
}



public ImmutableMap<Long, Backend> getClusterIdToBackend(String cluster) {
if (Strings.isNullOrEmpty(cluster)) {
return idToBackendRef.get();
}

Map<Long, Backend> retMaps = Maps.newHashMap();
for (Backend backend : idToBackendRef.get().values().asList()) {
if (cluster.equals(backend.getOwnerClusterName())) {
retMaps.put(backend.getId(), backend);
}
}
return ImmutableMap.copyOf(retMaps);
}

public long getBackendReportVersion(long backendId) {
AtomicLong atomicLong = null;
if ((atomicLong = idToReportVersionRef.get().get(backendId)) == null) {
Expand Down
2 changes: 1 addition & 1 deletion fe/src/com/baidu/palo/task/PullLoadTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public void executeOnce() throws InternalException {
UUID uuid = UUID.randomUUID();
executeId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
curCoordinator = new Coordinator(executeId, planner.getDescTable(),
planner.getFragments(), planner.getScanNodes());
planner.getFragments(), planner.getScanNodes(), db.getClusterName());
curCoordinator.setQueryType(TQueryType.LOAD);
curCoordinator.setExecMemoryLimit(execMemLimit);
}
Expand Down

0 comments on commit 029c0bf

Please sign in to comment.