Skip to content

Commit

Permalink
Relocate methods
Browse files Browse the repository at this point in the history
  • Loading branch information
jasontedor committed Jan 29, 2019
1 parent c715b5e commit f8e8547
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
Expand Down Expand Up @@ -2345,7 +2344,7 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
*/
final Tuple<Long, RetentionLeases> retentionPolicy = softDeletesPolicy.getRetentionPolicy();
commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(retentionPolicy.v1()));
commitData.put(Engine.RETENTION_LEASES, RetentionLease.encodeRetentionLeases(retentionPolicy.v2()));
commitData.put(Engine.RETENTION_LEASES, RetentionLeases.encodeRetentionLeases(retentionPolicy.v2()));
}
logger.trace("committing writer with commit data [{}]", commitData);
return commitData.entrySet().iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Locale;
import java.util.Objects;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -166,23 +164,6 @@ static String encodeRetentionLease(final RetentionLease retentionLease) {
retentionLease.source());
}

/**
* Encodes a collection of retention leases as a string. This encoding can be decoed by {@link #decodeRetentionLeases(String)}. The
* encoding is a comma-separated encoding of each retention lease as encoded by {@link #encodeRetentionLease(RetentionLease)}, prefixed
* by the version of the retention lease collection.
*
* @param retentionLeases the retention leases
* @return the encoding of the retention leases
*/
public static String encodeRetentionLeases(final RetentionLeases retentionLeases) {
Objects.requireNonNull(retentionLeases);
return String.format(
Locale.ROOT,
"version:%d;%s",
retentionLeases.version(),
retentionLeases.retentionLeases().stream().map(RetentionLease::encodeRetentionLease).collect(Collectors.joining(",")));
}

/**
* Decodes a retention lease encoded by {@link #encodeRetentionLease(RetentionLease)}.
*
Expand All @@ -204,35 +185,6 @@ static RetentionLease decodeRetentionLease(final String encodedRetentionLease) {
return new RetentionLease(id, retainingSequenceNumber, timestamp, source);
}

/**
* Decodes retention leases encoded by {@link #encodeRetentionLeases(RetentionLeases)}.
*
* @param encodedRetentionLeases an encoded collection of retention leases
* @return the decoded retention leases
*/
public static RetentionLeases decodeRetentionLeases(final String encodedRetentionLeases) {
Objects.requireNonNull(encodedRetentionLeases);
if (encodedRetentionLeases.isEmpty()) {
return RetentionLeases.EMPTY;
}
assert encodedRetentionLeases.matches("version:\\d+;.*") : encodedRetentionLeases;
final int firstSemicolon = encodedRetentionLeases.indexOf(";");
final long version = Long.parseLong(encodedRetentionLeases.substring("version:".length(), firstSemicolon));
final Collection<RetentionLease> retentionLeases;
if (firstSemicolon + 1 == encodedRetentionLeases.length()) {
retentionLeases = Collections.emptyList();
} else {
assert Arrays.stream(encodedRetentionLeases.substring(firstSemicolon + 1).split(","))
.allMatch(s -> s.matches("id:[^:;,]+;retaining_seq_no:\\d+;timestamp:\\d+;source:[^:;,]+"))
: encodedRetentionLeases;
retentionLeases = Arrays.stream(encodedRetentionLeases.substring(firstSemicolon + 1).split(","))
.map(RetentionLease::decodeRetentionLease)
.collect(Collectors.toList());
}

return new RetentionLeases(version, retentionLeases);
}

@Override
public boolean equals(final Object o) {
if (this == o) return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Locale;
import java.util.Objects;
import java.util.stream.Collectors;

/**
* Represents a versioned collection of retention leases. We version the collection of retention leases to ensure that sync requests that
Expand Down Expand Up @@ -104,6 +107,52 @@ public void writeTo(final StreamOutput out) throws IOException {
out.writeCollection(retentionLeases);
}

/**
* Encodes a retention lease collection as a string. This encoding can be decoded by
* {@link RetentionLeases#decodeRetentionLeases(String)}. The encoding is a comma-separated encoding of each retention lease as encoded
* by {@link RetentionLease#encodeRetentionLease(RetentionLease)}, prefixed by the version of the retention lease collection.
*
* @param retentionLeases the retention lease collection
* @return the encoding of the retention lease collection
*/
public static String encodeRetentionLeases(final RetentionLeases retentionLeases) {
Objects.requireNonNull(retentionLeases);
return String.format(
Locale.ROOT,
"version:%d;%s",
retentionLeases.version(),
retentionLeases.retentionLeases().stream().map(RetentionLease::encodeRetentionLease).collect(Collectors.joining(",")));
}

/**
* Decodes retention leases encoded by {@link #encodeRetentionLeases(RetentionLeases)}.
*
* @param encodedRetentionLeases an encoded retention lease collection
* @return the decoded retention lease collection
*/
public static RetentionLeases decodeRetentionLeases(final String encodedRetentionLeases) {
Objects.requireNonNull(encodedRetentionLeases);
if (encodedRetentionLeases.isEmpty()) {
return EMPTY;
}
assert encodedRetentionLeases.matches("version:\\d+;.*") : encodedRetentionLeases;
final int firstSemicolon = encodedRetentionLeases.indexOf(";");
final long version = Long.parseLong(encodedRetentionLeases.substring("version:".length(), firstSemicolon));
final Collection<RetentionLease> retentionLeases;
if (firstSemicolon + 1 == encodedRetentionLeases.length()) {
retentionLeases = Collections.emptyList();
} else {
assert Arrays.stream(encodedRetentionLeases.substring(firstSemicolon + 1).split(","))
.allMatch(s -> s.matches("id:[^:;,]+;retaining_seq_no:\\d+;timestamp:\\d+;source:[^:;,]+"))
: encodedRetentionLeases;
retentionLeases = Arrays.stream(encodedRetentionLeases.substring(firstSemicolon + 1).split(","))
.map(RetentionLease::decodeRetentionLease)
.collect(Collectors.toList());
}

return new RetentionLeases(version, retentionLeases);
}

@Override
public String toString() {
return "RetentionLeases{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1447,7 +1447,7 @@ static RetentionLeases getRetentionLeases(final SegmentInfos segmentInfos) {
if (committedRetentionLeases == null) {
return RetentionLeases.EMPTY;
}
return RetentionLease.decodeRetentionLeases(committedRetentionLeases);
return RetentionLeases.decodeRetentionLeases(committedRetentionLeases);
}

private void trimUnsafeCommits() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5325,7 +5325,7 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException {
} else {
assertThat(
engine.getLastCommittedSegmentInfos().getUserData().get(Engine.RETENTION_LEASES),
equalTo(RetentionLease.encodeRetentionLeases(leases)));
equalTo(RetentionLeases.encodeRetentionLeases(leases)));
}
}
if (rarely()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void testRetentionLeasesSyncedOnAdd() throws Exception {
latch.await();

// check retention leases have been committed on the primary
final RetentionLeases primaryCommittedRetentionLeases = RetentionLease.decodeRetentionLeases(
final RetentionLeases primaryCommittedRetentionLeases = RetentionLeases.decodeRetentionLeases(
primary.acquireLastIndexCommit(false).getIndexCommit().getUserData().get(Engine.RETENTION_LEASES));
assertThat(currentRetentionLeases, equalTo(toMap(primaryCommittedRetentionLeases)));

Expand All @@ -89,7 +89,7 @@ public void testRetentionLeasesSyncedOnAdd() throws Exception {
assertThat(retentionLeasesOnReplica, equalTo(currentRetentionLeases));

// check retention leases have been committed on the replica
final RetentionLeases replicaCommittedRetentionLeases = RetentionLease.decodeRetentionLeases(
final RetentionLeases replicaCommittedRetentionLeases = RetentionLeases.decodeRetentionLeases(
replica.acquireLastIndexCommit(false).getIndexCommit().getUserData().get(Engine.RETENTION_LEASES));
assertThat(currentRetentionLeases, equalTo(toMap(replicaCommittedRetentionLeases)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,26 +107,4 @@ public void testRetentionLeaseEncoding() {
assertThat(RetentionLease.decodeRetentionLease(RetentionLease.encodeRetentionLease(retentionLease)), equalTo(retentionLease));
}

public void testRetentionLeasesEncoding() {
final long version = randomNonNegativeLong();
final int length = randomIntBetween(0, 8);
final List<RetentionLease> retentionLeases = new ArrayList<>(length);
for (int i = 0; i < length; i++) {
final String id = randomAlphaOfLength(8);
final long retainingSequenceNumber = randomNonNegativeLong();
final long timestamp = randomNonNegativeLong();
final String source = randomAlphaOfLength(8);
final RetentionLease retentionLease = new RetentionLease(id, retainingSequenceNumber, timestamp, source);
retentionLeases.add(retentionLease);
}
final RetentionLeases decodedRetentionLeases =
RetentionLease.decodeRetentionLeases(RetentionLease.encodeRetentionLeases(new RetentionLeases(version, retentionLeases)));
assertThat(decodedRetentionLeases.version(), equalTo(version));
if (length == 0) {
assertThat(decodedRetentionLeases.retentionLeases(), empty());
} else {
assertThat(decodedRetentionLeases.retentionLeases(), contains(retentionLeases.toArray(new RetentionLease[0])));
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.seqno;

import org.elasticsearch.test.ESTestCase;

import java.util.ArrayList;
import java.util.List;

import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.*;

public class RetentionLeasesTests extends ESTestCase {

public void testRetentionLeasesEncoding() {
final long version = randomNonNegativeLong();
final int length = randomIntBetween(0, 8);
final List<RetentionLease> retentionLeases = new ArrayList<>(length);
for (int i = 0; i < length; i++) {
final String id = randomAlphaOfLength(8);
final long retainingSequenceNumber = randomNonNegativeLong();
final long timestamp = randomNonNegativeLong();
final String source = randomAlphaOfLength(8);
final RetentionLease retentionLease = new RetentionLease(id, retainingSequenceNumber, timestamp, source);
retentionLeases.add(retentionLease);
}
final RetentionLeases decodedRetentionLeases =
RetentionLeases.decodeRetentionLeases(RetentionLeases.encodeRetentionLeases(new RetentionLeases(version, retentionLeases)));
assertThat(decodedRetentionLeases.version(), equalTo(version));
if (length == 0) {
assertThat(decodedRetentionLeases.retentionLeases(), empty());
} else {
assertThat(decodedRetentionLeases.retentionLeases(), contains(retentionLeases.toArray(new RetentionLease[0])));
}
}

}

0 comments on commit f8e8547

Please sign in to comment.