Skip to content

Commit

Permalink
Rename IggyClient interface, add initial builder structure
Browse files Browse the repository at this point in the history
  • Loading branch information
mmodzelewski committed Nov 4, 2024
1 parent 97ca845 commit 7eb96cf
Show file tree
Hide file tree
Showing 29 changed files with 138 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,20 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rs.iggy.clients.blocking.IggyBaseClient;
import rs.iggy.clients.blocking.tcp.IggyTcpClient;
import rs.iggy.consumergroup.Consumer;
import rs.iggy.consumergroup.ConsumerGroupDetails;
import rs.iggy.identifier.ConsumerId;
import rs.iggy.identifier.StreamId;
import rs.iggy.identifier.TopicId;
import rs.iggy.message.PolledMessage;
import rs.iggy.message.PolledMessages;
import rs.iggy.message.PollingStrategy;
import rs.iggy.stream.StreamDetails;
import rs.iggy.topic.CompressionAlgorithm;
import rs.iggy.topic.TopicDetails;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import static java.util.Optional.empty;

Expand All @@ -31,17 +30,17 @@ public class SimpleConsumer {
private static final Logger log = LoggerFactory.getLogger(SimpleConsumer.class);

public static void main(String[] args) {
IggyTcpClient client = new IggyTcpClient("localhost", 8090);
var client = new IggyTcpClient("localhost", 8090);
client.users().login("iggy", "iggy");

createStream(client);
createTopic(client);
createConsumerGroup(client);
client.consumerGroups().joinConsumerGroup(STREAM_ID, TOPIC_ID, GROUP_ID);

List<PolledMessage> messages = new ArrayList<>();
var messages = new ArrayList<PolledMessage>();
while (messages.size() < 1000) {
PolledMessages polledMessages = client.messages()
var polledMessages = client.messages()
.pollMessages(STREAM_ID,
TOPIC_ID,
empty(),
Expand All @@ -57,15 +56,15 @@ public static void main(String[] args) {
}
}

private static void createStream(IggyTcpClient client) {
private static void createStream(IggyBaseClient client) {
Optional<StreamDetails> stream = client.streams().getStream(STREAM_ID);
if (stream.isPresent()) {
return;
}
client.streams().createStream(empty(), STREAM_NAME);
}

private static void createTopic(IggyTcpClient client) {
private static void createTopic(IggyBaseClient client) {
Optional<TopicDetails> topic = client.topics().getTopic(STREAM_ID, TOPIC_ID);
if (topic.isPresent()) {
return;
Expand All @@ -82,7 +81,7 @@ private static void createTopic(IggyTcpClient client) {

}

private static void createConsumerGroup(IggyTcpClient client) {
private static void createConsumerGroup(IggyBaseClient client) {
Optional<ConsumerGroupDetails> consumerGroup = client.consumerGroups()
.getConsumerGroup(STREAM_ID, TOPIC_ID, GROUP_ID);
if (consumerGroup.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class SimpleProducer {
private static final Logger log = LoggerFactory.getLogger(SimpleProducer.class);

public static void main(String[] args) {
IggyTcpClient client = new IggyTcpClient("localhost", 8090);
var client = new IggyTcpClient("localhost", 8090);
client.users().login("iggy", "iggy");

createStream(client);
Expand Down
12 changes: 11 additions & 1 deletion java-sdk/src/main/java/rs/iggy/Iggy.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,14 @@
package rs.iggy;

public class Iggy {
import rs.iggy.clients.blocking.IggyClientBuilder;

public final class Iggy {

private Iggy() {
}

public static IggyClientBuilder clientBuilder() {
return new IggyClientBuilder();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package rs.iggy.clients.blocking;

public interface IggyBaseClient {

SystemClient system();

StreamsClient streams();

UsersClient users();

TopicsClient topics();

PartitionsClient partitions();

ConsumerGroupsClient consumerGroups();

ConsumerOffsetsClient consumerOffsets();

MessagesClient messages();

PersonalAccessTokensClient personalAccessTokens();

}
24 changes: 8 additions & 16 deletions java-sdk/src/main/java/rs/iggy/clients/blocking/IggyClient.java
Original file line number Diff line number Diff line change
@@ -1,23 +1,15 @@
package rs.iggy.clients.blocking;

public interface IggyClient {
public class IggyClient {

SystemClient system();
private final IggyBaseClient baseClient;

StreamsClient streams();
public IggyClient(IggyBaseClient baseClient) {
this.baseClient = baseClient;
}

UsersClient users();

TopicsClient topics();

PartitionsClient partitions();

ConsumerGroupsClient consumerGroups();

ConsumerOffsetsClient consumerOffsets();

MessagesClient messages();

PersonalAccessTokensClient personalAccessTokens();
public IggyBaseClient getBaseClient() {
return baseClient;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package rs.iggy.clients.blocking;

public class IggyClientBuilder {

private IggyBaseClient client;

public IggyClientBuilder withBaseClient(IggyBaseClient client) {
this.client = client;
return this;
}

public IggyClient build() {
if (client == null) {
throw new IllegalArgumentException("Base client not provided");
}
return new IggyClient(client);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import rs.iggy.clients.blocking.*;

public class IggyHttpClient implements IggyClient {
public class IggyHttpClient implements IggyBaseClient {

private final SystemHttpClient systemClient;
private final StreamsHttpClient streamsClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import rs.iggy.clients.blocking.*;

public class IggyTcpClient implements IggyClient {
public class IggyTcpClient implements IggyBaseClient {

private final UsersTcpClient usersClient;
private final StreamsTcpClient streamsClient;
Expand All @@ -16,6 +16,7 @@ public class IggyTcpClient implements IggyClient {

public IggyTcpClient(String host, Integer port) {
InternalTcpClient tcpClient = new InternalTcpClient(host, port);
tcpClient.connect();
usersClient = new UsersTcpClient(tcpClient);
streamsClient = new StreamsTcpClient(tcpClient);
topicsClient = new TopicsTcpClient(tcpClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@ final class InternalTcpClient {
private static final int COMMAND_LENGTH = 4;
private static final int RESPONSE_INITIAL_BYTES_LENGTH = 8;

private final Connection connection;
private final TcpClient client;
private final BlockingQueue<IggyResponse> responses = new LinkedBlockingQueue<>();
private Connection connection;

InternalTcpClient(String host, Integer port) {
this.connection = TcpClient.create()
client = TcpClient.create()
.host(host)
.port(port)
.doOnConnected(conn -> conn.addHandlerLast(new IggyResponseDecoder()))
.connectNow();
.doOnConnected(conn -> conn.addHandlerLast(new IggyResponseDecoder()));
}

void connect() {
this.connection = client.connectNow();
this.connection.inbound().receiveObject().ofType(IggyResponse.class).subscribe(responses::add);
}

Expand Down
20 changes: 20 additions & 0 deletions java-sdk/src/test/java/rs/iggy/IggyTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package rs.iggy;

import org.junit.jupiter.api.Test;
import rs.iggy.clients.blocking.http.IggyHttpClient;
import static org.assertj.core.api.Assertions.assertThat;

class IggyTest {

@Test
void test() {
var baseClient = new IggyHttpClient("http://localhost:8080");

var client = Iggy.clientBuilder()
.withBaseClient(baseClient)
.build();

assertThat(client).isNotNull();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ public abstract class IntegrationTest {
protected final GenericContainer<?> iggyServer = new GenericContainer<>(DockerImageName.parse("iggyrs/iggy:latest"))
.withExposedPorts(HTTP_PORT, TCP_PORT);

protected IggyClient client;
protected IggyBaseClient client;

@BeforeEach
void beforeEachIntegrationTest() {
client = getClient();
}

abstract protected IggyClient getClient();
abstract protected IggyBaseClient getClient();

protected void setUpStream() {
client.streams().createStream(of(42L), "test-stream");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package rs.iggy.clients.blocking.http;

import rs.iggy.clients.blocking.ConsumerGroupsClientBaseTest;
import rs.iggy.clients.blocking.IggyClient;
import rs.iggy.clients.blocking.IggyBaseClient;

class ConsumerGroupsHttpClientTest extends ConsumerGroupsClientBaseTest {

@Override
protected IggyClient getClient() {
protected IggyBaseClient getClient() {
return HttpClientFactory.create(iggyServer);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package rs.iggy.clients.blocking.http;

import rs.iggy.clients.blocking.ConsumerOffsetsClientBaseTest;
import rs.iggy.clients.blocking.IggyClient;
import rs.iggy.clients.blocking.IggyBaseClient;

class ConsumerOffsetsHttpClientTest extends ConsumerOffsetsClientBaseTest {

@Override
protected IggyClient getClient() {
protected IggyBaseClient getClient() {
return HttpClientFactory.create(iggyServer);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package rs.iggy.clients.blocking.http;

import rs.iggy.clients.blocking.IggyClient;
import rs.iggy.clients.blocking.IggyBaseClient;
import rs.iggy.clients.blocking.MessagesClientBaseTest;

class MessagesHttpClientTest extends MessagesClientBaseTest {

@Override
protected IggyClient getClient() {
protected IggyBaseClient getClient() {
return HttpClientFactory.create(iggyServer);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package rs.iggy.clients.blocking.http;

import rs.iggy.clients.blocking.IggyClient;
import rs.iggy.clients.blocking.IggyBaseClient;
import rs.iggy.clients.blocking.PartitionsClientBaseTest;

class PartitionsHttpClientTest extends PartitionsClientBaseTest {

@Override
protected IggyClient getClient() {
protected IggyBaseClient getClient() {
return HttpClientFactory.create(iggyServer);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package rs.iggy.clients.blocking.http;

import rs.iggy.clients.blocking.IggyClient;
import rs.iggy.clients.blocking.IggyBaseClient;
import rs.iggy.clients.blocking.PersonalAccessTokensBaseTest;

class PersonalAccessTokensHttpClientTest extends PersonalAccessTokensBaseTest {

@Override
protected IggyClient getClient() {
protected IggyBaseClient getClient() {
return HttpClientFactory.create(iggyServer);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package rs.iggy.clients.blocking.http;

import rs.iggy.clients.blocking.IggyClient;
import rs.iggy.clients.blocking.IggyBaseClient;
import rs.iggy.clients.blocking.StreamClientBaseTest;

class StreamHttpClientTest extends StreamClientBaseTest {

@Override
protected IggyClient getClient() {
protected IggyBaseClient getClient() {
return HttpClientFactory.create(iggyServer);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package rs.iggy.clients.blocking.http;

import rs.iggy.clients.blocking.IggyClient;
import rs.iggy.clients.blocking.IggyBaseClient;
import rs.iggy.clients.blocking.SystemClientBaseTest;

class SystemHttpClientTest extends SystemClientBaseTest {

@Override
protected IggyClient getClient() {
protected IggyBaseClient getClient() {
return HttpClientFactory.create(iggyServer);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package rs.iggy.clients.blocking.http;

import rs.iggy.clients.blocking.IggyClient;
import rs.iggy.clients.blocking.IggyBaseClient;
import rs.iggy.clients.blocking.TopicsClientBaseTest;

class TopicsHttpClientTest extends TopicsClientBaseTest {

@Override
protected IggyClient getClient() {
protected IggyBaseClient getClient() {
return HttpClientFactory.create(iggyServer);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package rs.iggy.clients.blocking.http;

import org.junit.jupiter.api.Test;
import rs.iggy.clients.blocking.IggyClient;
import rs.iggy.clients.blocking.IggyBaseClient;
import rs.iggy.clients.blocking.UsersClientBaseTest;
import static org.assertj.core.api.Assertions.assertThat;

class UsersHttpClientTest extends UsersClientBaseTest {

@Override
protected IggyClient getClient() {
protected IggyBaseClient getClient() {
return HttpClientFactory.create(iggyServer);
}

Expand Down
Loading

0 comments on commit 7eb96cf

Please sign in to comment.