Skip to content

Commit

Permalink
reformat
Browse files Browse the repository at this point in the history
  • Loading branch information
Pengzna committed Aug 4, 2024
1 parent f3c819d commit 1a1458d
Show file tree
Hide file tree
Showing 74 changed files with 986 additions and 829 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,8 @@ private void queryAnd2Queue() {
/**
* Multi -thread query
*
* @param Point Start Query Point, Followed Up According to this PointValue as the next query condition to iterate
* @param Point Start Query Point, Followed Up According to this PointValue as the next
* query condition to iterate
* @param Scancount allows the number of threads to start
* @throws IOException
* @throws InterruptedException
Expand Down
24 changes: 12 additions & 12 deletions hugegraph-store/hg-store-cli/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@

<appenders>
<Console name="console" target="SYSTEM_OUT">
<ThresholdFilter level="DEBUG" onMatch="ACCEPT" onMismatch="DENY"/>
<PatternLayout pattern="%-d{yyyy-MM-dd HH:mm:ss} [%t] [%p] %c{1.} - %m%n"/>
<ThresholdFilter level="DEBUG" onMatch="ACCEPT" onMismatch="DENY" />
<PatternLayout pattern="%-d{yyyy-MM-dd HH:mm:ss} [%t] [%p] %c{1.} - %m%n" />
<!--JsonLayout compact="false" locationInfo="true">
<KeyValuePair key="timestamp" value="$${date:yyyy-MM-dd HH:mm:ss.SSS}"/>
</JsonLayout-->
Expand All @@ -37,24 +37,24 @@
<RollingRandomAccessFile name="file" fileName="${LOG_PATH}/${FILE_NAME}.log"
filePattern="${LOG_PATH}/$${date:yyyy-MM}/${FILE_NAME}-%d{yyyy-MM-dd}-%i.log"
bufferedIO="false" immediateFlush="true">
<ThresholdFilter level="TRACE" onMatch="ACCEPT" onMismatch="DENY"/>
<PatternLayout pattern="%-d{yyyy-MM-dd HH:mm:ss} [%t] [%p] %c{1.} - %m%n"/>
<ThresholdFilter level="TRACE" onMatch="ACCEPT" onMismatch="DENY" />
<PatternLayout pattern="%-d{yyyy-MM-dd HH:mm:ss} [%t] [%p] %c{1.} - %m%n" />
<!--JsonLayout compact="true" eventEol="true" complete="true" locationInfo="true">
<KeyValuePair key="timestamp" value="$${date:yyyy-MM-dd HH:mm:ss.SSS}"/>
</JsonLayout-->
<!-- Trigger after exceeding 1day or 50MB -->
<Policies>
<SizeBasedTriggeringPolicy size="50MB"/>
<TimeBasedTriggeringPolicy interval="1" modulate="true"/>
<SizeBasedTriggeringPolicy size="50MB" />
<TimeBasedTriggeringPolicy interval="1" modulate="true" />
</Policies>
<!-- Keep 5 files per day & auto Delete after over 2GB or 100 files -->
<DefaultRolloverStrategy max="5">
<Delete basePath="${LOG_PATH}" maxDepth="2">
<IfFileName glob="*/*.log"/>
<IfFileName glob="*/*.log" />
<!-- Limit log amount & size -->
<IfAny>
<IfAccumulatedFileSize exceeds="2GB"/>
<IfAccumulatedFileCount exceeds="100"/>
<IfAccumulatedFileSize exceeds="2GB" />
<IfAccumulatedFileCount exceeds="100" />
</IfAny>
</Delete>
</DefaultRolloverStrategy>
Expand All @@ -64,13 +64,13 @@

<loggers>
<logger name="io.grpc.netty" level="INFO" additivity="false">
<appender-ref ref="console"/>
<appender-ref ref="console" />
</logger>
<logger name="org.springframework" level="INFO" additivity="false">
<appender-ref ref="console"/>
<appender-ref ref="console" />
</logger>
<root level="INFO">
<appender-ref ref="console"/>
<appender-ref ref="console" />
</root>

</loggers>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ public KvBatchScanner(

/**
* Construct a streaming query iterator
* Scanquery for splitting, start multiple streaming requests to enhance the concurrentness of Store
* Scanquery for splitting, start multiple streaming requests to enhance the concurrentness
* of Store
*
* @param scanQuery scanQuery
* @param handler task handler
Expand Down Expand Up @@ -232,13 +233,18 @@ public boolean isFinished() {
* Evaluate the maximum task number
*/
private void evaluateMaxTaskSize() {
if (maxTaskSize == 0) { // Based on the first batch of tasks, get the number of stores, and then calculate the maximum number of tasks
if (maxTaskSize ==
0) { // Based on the first batch of tasks, get the number of stores, and then
// calculate the maximum number of tasks
if (scanQuery.getOrderType() == ScanOrderType.ORDER_STRICT) {
maxTaskSize = 1; // Sort, one stream of each machine, all other streams can be started after all the streams are over
maxTaskSize =
1; // Sort, one stream of each machine, all other streams can be
// started after all the streams are over
} else {
maxTaskSize = this.notifier.getScannerCount() * maxTaskSizePerStore;
}
maxBatchSize = this.notifier.getScannerCount() * maxBatchSize; // A maximum of 1,000 per machine
maxBatchSize = this.notifier.getScannerCount() *
maxBatchSize; // A maximum of 1,000 per machine

/*
* Limit less than10000 Start a stream to save network bandwidth
Expand Down Expand Up @@ -274,7 +280,8 @@ public void splitTask() {
// Evaluate the maximum task number
evaluateMaxTaskSize();
if (this.notifier.getScannerCount() < this.maxTaskSize) {
splitTask(); // Do not reach the maximum number of tasks, continue to split
splitTask(); // Do not reach the maximum number of tasks, continue to
// split
}
}
this.finished = !prefixItr.hasNext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,12 @@ public boolean hasNext() {
int waitTime = 0;
while (current == null) {
try {
// The queue has data, and there is an active query device, and the task is not allocated
// The queue has data, and there is an active query device, and the task is not
// allocated
if (queue.size() != 0 || scanners.size() > 0 || !taskSplitter.isFinished()) {
current = queue.poll(1, TimeUnit.SECONDS); // Check whether the client is closed regularly
current = queue.poll(1,
TimeUnit.SECONDS); // Check whether the client is
// closed regularly
} else {
break;
}
Expand Down Expand Up @@ -179,7 +182,9 @@ public boolean hasNext() {
try {
int waitTime = 0;
Supplier<HgKvIterator<HgKvEntry>> current;
current = queue.poll(1, TimeUnit.SECONDS); // Check whether the client is closed regularly
current = queue.poll(1,
TimeUnit.SECONDS); // Check whether the client is
// closed regularly
if (current == null) {
if (++waitTime > maxWaitCount) {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@
public class HeartbeatService implements Lifecycle<HgStoreEngineOptions>, PartitionStateListener {

private static final int MAX_HEARTBEAT_RETRY_COUNT = 5; // Heartbeat trial number
private static final int REGISTER_RETRY_INTERVAL = 1; // Register the time interval between retry, the unit seconds
private static final int REGISTER_RETRY_INTERVAL = 1;
// Register the time interval between retry, the unit seconds
private final HgStoreEngine storeEngine;
private final List<HgStoreStateListener> stateListeners;
private final Object partitionThreadLock = new Object();
Expand Down Expand Up @@ -123,7 +124,7 @@ public boolean isClusterReady() {

/**
* There are four types of service status
Be ready,在线、离线、死亡(从集群排除)
* Be ready,在线、离线、死亡(从集群排除)
*/
protected void doStoreHeartbeat() {
while (!terminated) {
Expand Down Expand Up @@ -170,7 +171,8 @@ protected void doPartitionHeartbeat() {

protected void registerStore() {
try {
// Register the store, register the PD for the first time to generate ID, and automatically assign value to StoreInfo
// Register the store, register the PD for the first time to generate ID, and
// automatically assign value to StoreInfo
this.storeInfo.setStoreAddress(IpUtil.getNearestAddress(options.getGrpcAddress()));
this.storeInfo.setRaftAddress(IpUtil.getNearestAddress(options.getRaftAddress()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ public static HgStoreEngine getInstance() {
}

/**
* 1. ReadStoreid, register to PD, and register for the first time from PD to generate storeid, store to the local area
* 1. ReadStoreid, register to PD, and register for the first time from PD to generate
* storeid, store to the local area
* 2. Successful registry, start Raft service
* 3. Send regularlystore heartbeat and partition heartbeat, keep in touch with PD
*
Expand Down Expand Up @@ -288,7 +289,9 @@ private PartitionEngine createPartitionEngine(int groupId, ShardGroup shardGroup
if ((engine = partitionEngines.get(groupId)) == null) {
engineLocks.computeIfAbsent(groupId, k -> new Object());
synchronized (engineLocks.get(groupId)) {
// Special circumstances during division (the number of partitions in the cluster is different), which will lead to dividing partitions. It may not be on this machine.
// Special circumstances during division (the number of partitions in the cluster
// is different), which will lead to dividing partitions. It may not be on this
// machine.
if (conf != null) {
var list = conf.listPeers();
list.addAll(conf.listLearners());
Expand Down Expand Up @@ -341,7 +344,8 @@ private PartitionEngine createPartitionEngine(int groupId, ShardGroup shardGroup
}

/**
* Create Raft group, in addition to creating a local Raft Node, it is also necessary to notify other Peer to create Raft Node
* Create Raft group, in addition to creating a local Raft Node, it is also necessary to
* notify other Peer to create Raft Node
* 1. Travelpartition.shards
* 2, accounting tostoreid to get store information
* 3. Establish to OtherStore's RAFT RPC, send StartRaft messages
Expand All @@ -365,7 +369,8 @@ public PartitionEngine createPartitionGroups(Partition partition) {
if (store == null || partitionManager.isLocalStore(store)) {
return;
}
// Send a message to other Peer and create RAFT grouping.Here is an asynchronous sending
// Send a message to other Peer and create RAFT grouping.Here is an
// asynchronous sending
hgCmdClient.createRaftNode(store.getRaftAddress(), List.of(partition),
status -> {
log.info(
Expand All @@ -391,7 +396,8 @@ public void destroyPartitionGroups(Partition partition) {
if (store == null) {
return;
}
// Send a message to other Peer and create RAFT grouping.Here is an asynchronous sending
// Send a message to other Peer and create RAFT grouping.Here is an asynchronous
// sending
hgCmdClient.destroyRaftNode(store.getRaftAddress(),
Arrays.asList(new Partition[]{partition}),
status -> {
Expand Down Expand Up @@ -549,7 +555,8 @@ public List<String> getDataLocations() {
* Add toraft task
* 1. Checkpartition
* 1.1. If not exist, check the PD inquiry partition whether it belongs to the local area
* 1.1.1 if the partition belongs to the local area, then create Raft packets and notify other Stores
* 1.1.1 if the partition belongs to the local area, then create Raft packets and notify
* other Stores
* 1.1.2 if partitions are not local, then abnormal
* 1.2 ExaminationPartition Is it a leader
* 1.2.1 If Yesleader, the task is submitted
Expand All @@ -569,7 +576,8 @@ public void addRaftTask(String graphName, Integer partId, RaftOperation operatio
Partition partition = partitionManager.findPartition(graphName, partId);
if (partition != null) {
engine = this.createPartitionGroups(partition);
// May migrate, should not be created, put it in the Synchronize body to avoid the later ones
// May migrate, should not be created, put it in the Synchronize body to
// avoid the later ones
if (engine != null) {
engine.waitForLeader(options.getWaitLeaderTimeout() * 1000);
}
Expand All @@ -588,7 +596,8 @@ public void addRaftTask(String graphName, Integer partId, RaftOperation operatio
Store store = partitionManager.getStoreByRaftEndpoint(engine.getShardGroup(),
leader.toString());
if (store.getId() == 0) {
// Not found the store information of the leader locally, maybe partition has not yet been synchronized and re -obtain it from the Leader.
// Not found the store information of the leader locally, maybe partition has
// not yet been synchronized and re -obtain it from the Leader.
Store leaderStore = hgCmdClient.getStoreInfo(leader.toString());
store = leaderStore != null ? leaderStore : store;
log.error("getStoreByRaftEndpoint error store:{}, shard: {}, leader is {}",
Expand Down Expand Up @@ -702,7 +711,8 @@ public void run(Status status) {
}

/**
* Partition objectKey range and status change, by actively looking for leaders and then notifying other Follower
* Partition objectKey range and status change, by actively looking for leaders and then
* notifying other Follower
*/
@Override
public UpdatePartitionResponse rangeOrStateChanged(UpdatePartitionRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ public LogStorage createLogStorage(final String uri, final RaftOptions raftOptio
this.raftNode = raftGroupService.start(false);
this.raftNode.addReplicatorStateListener(new ReplicatorStateListener());

// Check whether the Peers returned by the PD is consistent with the local area. If it is not consistent, reset the peerlist
// Check whether the Peers returned by the PD is consistent with the local area. If it is
// not consistent, reset the peerlist
if (this.raftNode != null) {
// Todo check the peer list, if the peer changes, the reset
started = true;
Expand All @@ -280,7 +281,8 @@ public ShardGroup getShardGroup() {
}

/**
* 1. Partition migration instructions sent by ReceivePd, add migration tasks to the status machine, and the state is newly built
* 1. Partition migration instructions sent by ReceivePd, add migration tasks to the status
* machine, and the state is newly built
* 2. Execute the Status Machine Message, add to the task queue, and execute the task
* 3, Relatively New and Oldpeer, find the new and deleted Peer
* 4. If there is a new onepeer
Expand Down Expand Up @@ -364,7 +366,9 @@ public Status changePeers(List<String> peers, final Closure done) {
result = HgRaftError.TASK_ERROR.toStatus();
}
} else if (snapshotOk) {
result = Status.OK(); // Without Learner, it means that only delete operations are done
result =
Status.OK(); // Without Learner, it means that only delete operations
// are done
}
}
if (result.isOk()) {
Expand Down Expand Up @@ -563,7 +567,8 @@ public Endpoint waitForLeader(long timeOut) {
if (partitionManager.isLocalPartition(this.options.getGroupId())) {
log.error("Raft {} leader not found, try to repair!",
this.options.getGroupId());
// TODO judge whether RAFT is the machine, if so, try to repair the leader, including checking whether the configuration is correct
// TODO judge whether RAFT is the machine, if so, try to repair the
// leader, including checking whether the configuration is correct
storeEngine.createPartitionGroups(
partitionManager.getPartitionList(getGroupId()).get(0));
}
Expand Down Expand Up @@ -734,7 +739,8 @@ public Status transferLeader(String graphName, Metapb.Shard shard) {
* 1. Compared with new and oldpeer to find the new and deleted peer
* 2. For the newly addedpeer, add it as a Learner manner
* 3. Surveillance snapshot synchronization event
* 4. After the snapshot is synchronized, call Changepers, modify the Learner to the follower, delete the old Peer
* 4. After the snapshot is synchronized, call Changepers, modify the Learner to the
* follower, delete the old Peer
*/
public void doChangeShard(final MetaTask.Task task, Closure done) {
if (!isLeader()) {
Expand All @@ -757,7 +763,8 @@ public void doChangeShard(final MetaTask.Task task, Closure done) {
List<String> peers =
partitionManager.shards2Peers(task.getChangeShard().getShardList());
HashSet<String> hashSet = new HashSet<>(peers);
// There is the same peers in the task, indicating that the task itself is wrong, and the task is ignored
// There is the same peers in the task, indicating that the task itself is
// wrong, and the task is ignored
if (peers.size() != hashSet.size()) {
log.info("Raft {} doChangeShard peer is repeat, peers:{}", getGroupId(),
peers);
Expand Down Expand Up @@ -965,14 +972,16 @@ private Status handleMoveTask(MetaTask.Task task) {
}

/**
* For the Cleaarence of the Entire Picture Deleted, delete the partition, if there are no other pictures, destroy RAFT Group.
* For the Cleaarence of the Entire Picture Deleted, delete the partition, if there are no
* other pictures, destroy RAFT Group.
* NEED to put it to callmove data
*
* @param graphName graph name
* @param partitionId partition id
* @param Keystart Key Start is used to verify
* @param Keynd Key END for verification
* @param IsleaderLeader, avoid the leader drift, take the leader status when taking the Move Data
* @param graphName graph name
* @param partitionId partition id
* @param Keystart Key Start is used to verify
* @param Keynd Key END for verification
* @param IsleaderLeader, avoid the leader drift, take the leader status when taking the Move
* Data
*/
private synchronized void destroyPartitionIfGraphsNull(String graphName, int partitionId,
long keyStart, long keyEnd,
Expand Down
Loading

0 comments on commit 1a1458d

Please sign in to comment.