Skip to content

Commit

Permalink
Initial barebones implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
wasdennnoch committed Dec 13, 2024
1 parent 3aeeb0e commit 76670a0
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 0 deletions.
59 changes: 59 additions & 0 deletions proto/clustering.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
syntax = "proto3";
package beemo.clustering;

import "google/protobuf/empty.proto";

option java_multiple_files = true;
option java_package = "gg.beemo.latte.proto";
option java_outer_classname = "ClusteringProto";


service Clustering {
rpc GetClusterConfig(GetClusterConfigRequest) returns (GetClusterConfigResponse);
rpc UpdateGuildStates(stream UpdateGuildStateRequest) returns (google.protobuf.Empty);
rpc LookupGuildCluster(LookupGuildClusterRequest) returns (LookupGuildClusterResponse);
}

// --- General --- //

message ShardIdentifier {
string cluster_id = 1;
uint32 shard_id = 2;
uint32 shard_count = 3;
}

// --- Cluster Config --- //

message GetClusterConfigRequest {
string cluster_id = 1;
string grpc_endpoint = 2;
}

message GetClusterConfigResponse {
repeated ShardIdentifier shards = 1;
}

// --- Guild State --- //

message UpdateGuildStateRequest {
ShardIdentifier shard = 1;
fixed64 guild_id = 2;
GuildState state = 3;
}

enum GuildState {
AVAILABLE = 0;
UNAVAILABLE = 1;
DELETED = 2;
}

// --- Guild Lookup --- //

message LookupGuildClusterRequest {
fixed64 guild_id = 1;
}

message LookupGuildClusterResponse {
string cluster_id = 1;
string grpc_endpoint = 2;
}
2 changes: 2 additions & 0 deletions vanilla/src/main/java/gg/beemo/vanilla/Vanilla.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import gg.beemo.latte.CommonConfig
import gg.beemo.latte.broker.rabbitmq.RabbitConnection
import gg.beemo.latte.config.Configurator
import gg.beemo.latte.logging.Log
import gg.beemo.vanilla.rpc.GrpcClusteringService
import io.grpc.Server
import io.grpc.ServerBuilder
import kotlinx.coroutines.runBlocking
Expand Down Expand Up @@ -35,6 +36,7 @@ object Vanilla {
log.debug("Initializing gRPC Ratelimit client")
val grpcServer: Server = ServerBuilder.forPort(Config.GRPC_PORT)
.addService(GrpcRatelimitService())
.addService(GrpcClusteringService())
.build()
.start()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package gg.beemo.vanilla.rpc

import com.google.protobuf.Empty
import gg.beemo.latte.logging.Log
import gg.beemo.latte.proto.ClusterConfigRequest
import gg.beemo.latte.proto.ClusterConfigResponse
import gg.beemo.latte.proto.clusterConfigResponse
import gg.beemo.latte.proto.ClusteringGrpcKt
import gg.beemo.latte.proto.GetClusterConfigRequest
import gg.beemo.latte.proto.GetClusterConfigResponse
import gg.beemo.latte.proto.GuildState
import gg.beemo.latte.proto.LookupGuildClusterRequest
import gg.beemo.latte.proto.LookupGuildClusterResponse
import gg.beemo.latte.proto.ShardIdentifier
import gg.beemo.latte.proto.shardIdentifier
import gg.beemo.latte.proto.UpdateGuildStateRequest
import gg.beemo.latte.proto.getClusterConfigResponse
import gg.beemo.latte.proto.lookupGuildClusterResponse
import kotlinx.coroutines.flow.Flow
import java.util.HashMap

data class ClusterConfig(
val clusterId: String,
val grpcEndpoint: String,
)

data class GuildStatus(
val guildId: Long,
val shard: ShardIdentifier,
val state: GuildState,
)

class GrpcClusteringService : ClusteringGrpcKt.ClusteringCoroutineImplBase() {

private val log by Log

private val clusters = HashMap<String, ClusterConfig>()
private val guilds = HashMap<Long, GuildStatus>()

override suspend fun getClusterConfig(request: GetClusterConfigRequest): GetClusterConfigResponse {
log.info("Received cluster config request from cluster ID '${request.clusterId}'")
this.clusters[request.clusterId] = ClusterConfig(
clusterId = request.clusterId,
grpcEndpoint = request.grpcEndpoint,
)
// TODO Return correct shard mapping
return getClusterConfigResponse {
this.shards += listOf(
shardIdentifier {
this.clusterId = "lol"
this.shardId = 0
this.shardCount = 1
},
)
}
}

override suspend fun updateGuildStates(requests: Flow<UpdateGuildStateRequest>): Empty {
requests.collect { update ->
val shard = update.shard
log.debug(
"Guild {} in Cluster {} Shard {}/{} has changed state to {}",
update.guildId, shard.clusterId, shard.shardId, shard.clusterId, update.state,
)
if (!clusters.containsKey(shard.clusterId)) {
log.warn("Unknown cluster {} in guild update for {}", shard.clusterId, update.guildId)
}
if (update.state == GuildState.DELETED) {
guilds.remove(update.guildId)
} else {
guilds[update.guildId] = GuildStatus(guildId = update.guildId, shard = shard, state = update.state)
}
}
return Empty.getDefaultInstance()
}

override suspend fun lookupGuildCluster(request: LookupGuildClusterRequest): LookupGuildClusterResponse {
val guild = guilds[request.guildId]
requireNotNull(guild) // TODO How to properly return errors in gRPC?
val cluster = clusters[guild.shard.clusterId]
requireNotNull(cluster) // TODO Same as above
return lookupGuildClusterResponse {
this.clusterId = cluster.clusterId
this.grpcEndpoint = cluster.grpcEndpoint
}
}

}
1 change: 1 addition & 0 deletions vanilla/src/main/proto/clustering.proto

0 comments on commit 76670a0

Please sign in to comment.