Skip to content

Commit

Permalink
feat: add game-player-data service
Browse files Browse the repository at this point in the history
  • Loading branch information
ZakShearman committed Nov 3, 2024
1 parent f17e385 commit 99cfd28
Show file tree
Hide file tree
Showing 19 changed files with 1,158 additions and 367 deletions.
1 change: 1 addition & 0 deletions go.work
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ go 1.23
use (
.
./libraries/test
./services/game-player-data
./services/mc-player-service
./services/message-handler
)
376 changes: 9 additions & 367 deletions go.work.sum

Large diffs are not rendered by default.

33 changes: 33 additions & 0 deletions services/game-player-data/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
FROM --platform=$BUILDPLATFORM golang:1.23-alpine AS build

ARG SERVICE_NAME=game-player-data

WORKDIR /build

# Copy sources
COPY . .

WORKDIR /build/services/$SERVICE_NAME

RUN go mod download

WORKDIR /build

ARG TARGETOS
ARG TARGETARCH

RUN --mount=type=cache,target=/root/.cache/go-build \
--mount=type=cache,target=/go/pkg \
CGO_ENABLED=0 \
GOOS=$TARGETOS \
GOARCH=$TARGETARCH \
go build -ldflags="-s -w" -o $SERVICE_NAME ./services/$SERVICE_NAME/cmd/$SERVICE_NAME

FROM alpine

ARG SERVICE_NAME=game-player-data

WORKDIR /app

COPY --from=build /build/$SERVICE_NAME /build/services/$SERVICE_NAME/run ./
CMD ["./game-player-data"]
35 changes: 35 additions & 0 deletions services/game-player-data/cmd/game-player-data/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package main

import (
"github.com/emortalmc/mono-services/services/game-player-data/internal/app"
"github.com/emortalmc/mono-services/services/game-player-data/internal/config"
"go.uber.org/zap"
"log"
)

func main() {
cfg, err := config.LoadGlobalConfig()
if err != nil {
log.Fatal("failed to load config", err)
}

unsugared, err := createLogger(cfg)
if err != nil {
log.Fatal(err)
}
logger := unsugared.Sugar()

app.Run(cfg, logger)
}

func createLogger(cfg *config.Config) (logger *zap.Logger, err error) {
if cfg.Development {
logger, err = zap.NewDevelopment()
} else {
logger, err = zap.NewProduction()
}
if err != nil {
return nil, err
}
return logger, nil
}
50 changes: 50 additions & 0 deletions services/game-player-data/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
module github.com/emortalmc/mono-services/services/game-player-data

go 1.23

require (
github.com/emortalmc/proto-specs/gen/go v0.0.0-20240927103241-2584fd28e0f9
github.com/google/uuid v1.6.0
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0
github.com/segmentio/kafka-go v0.4.47
github.com/spf13/viper v1.19.0
go.mongodb.org/mongo-driver v1.17.1
go.uber.org/zap v1.27.0
google.golang.org/grpc v1.67.1
google.golang.org/protobuf v1.35.1
)

require (
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/klauspost/compress v1.17.2 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/montanaflynn/stats v0.7.1 // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.6.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect
go.uber.org/multierr v1.10.0 // indirect
golang.org/x/crypto v0.26.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.24.0 // indirect
golang.org/x/text v0.17.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
249 changes: 249 additions & 0 deletions services/game-player-data/go.sum

Large diffs are not rendered by default.

40 changes: 40 additions & 0 deletions services/game-player-data/internal/app/game_player_data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package app

import (
"context"
"github.com/emortalmc/mono-services/services/game-player-data/internal/config"
"github.com/emortalmc/mono-services/services/game-player-data/internal/kafka"
"github.com/emortalmc/mono-services/services/game-player-data/internal/repository"
"github.com/emortalmc/mono-services/services/game-player-data/internal/service"
"go.uber.org/zap"
"os/signal"
"sync"
"syscall"
)

func Run(cfg *config.Config, logger *zap.SugaredLogger) {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()
wg := &sync.WaitGroup{}

repoCtx, repoCancel := context.WithCancel(ctx)
repoWg := &sync.WaitGroup{}

mongoDB, err := repository.CreateDatabase(repoCtx, cfg.MongoDB, repoWg, logger)
if err != nil {
logger.Fatalw("failed to create database", err)
}

repoColl := repository.NewGamePlayerDataRepoColl(mongoDB)

kafka.NewConsumer(ctx, wg, cfg.Kafka, logger, repoColl)

service.RunServices(ctx, logger, wg, cfg, repoColl)

wg.Wait()
logger.Info("shutting down")

logger.Info("shutting down repository")
repoCancel()
repoWg.Wait()
}
44 changes: 44 additions & 0 deletions services/game-player-data/internal/config/global.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package config

import (
"github.com/spf13/viper"
"strings"
)

type Config struct {
Kafka *KafkaConfig
MongoDB *MongoDBConfig

Development bool

Port uint16
}

type KafkaConfig struct {
Host string
Port int
}

type MongoDBConfig struct {
URI string
}

func LoadGlobalConfig() (config *Config, err error) {
viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
viper.AutomaticEnv()

viper.SetConfigName("config")
viper.AddConfigPath(".")

err = viper.ReadInConfig()
if err != nil {
return
}

err = viper.Unmarshal(&config)
if err != nil {
return
}

return
}
141 changes: 141 additions & 0 deletions services/game-player-data/internal/kafka/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package kafka

import (
"context"
"fmt"
"github.com/emortalmc/mono-services/services/game-player-data/internal/config"
"github.com/emortalmc/mono-services/services/game-player-data/internal/repository"
"github.com/emortalmc/mono-services/services/game-player-data/internal/repository/model"
pbmsg "github.com/emortalmc/proto-specs/gen/go/message/gameplayerdata"
pbmodel "github.com/emortalmc/proto-specs/gen/go/model/gameplayerdata"
"github.com/emortalmc/proto-specs/gen/go/nongenerated/kafkautils"
"github.com/google/uuid"
"github.com/segmentio/kafka-go"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"sync"
)

const GamePlayerDataTopic = "game-player-data"

type consumer struct {
logger *zap.SugaredLogger
repos *repository.GameDataRepoColl

reader *kafka.Reader
}

func NewConsumer(ctx context.Context, wg *sync.WaitGroup, cfg *config.KafkaConfig, logger *zap.SugaredLogger,
repos *repository.GameDataRepoColl) {

reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{fmt.Sprintf("%s:%d", cfg.Host, cfg.Port)},
GroupID: "game-player-data",
GroupTopics: []string{GamePlayerDataTopic},

Logger: kafka.LoggerFunc(func(format string, args ...interface{}) {
logger.Infow(fmt.Sprintf(format, args...))
}),
ErrorLogger: kafka.LoggerFunc(func(format string, args ...interface{}) {
logger.Errorw(fmt.Sprintf(format, args...))
}),
})

c := &consumer{
logger: logger,
repos: repos,

reader: reader,
}

handler := kafkautils.NewConsumerHandler(logger, reader)
handler.RegisterHandler(&pbmsg.UpdateGamePlayerDataMessage{}, c.handleUpdateGamePlayerDataMessage)

logger.Infow("starting listening for kafka messages", "topics", reader.Config().GroupTopics)

wg.Add(1)
go func() {
defer wg.Done()
handler.Run(ctx) // Run is blocking until the context is cancelled
if err := reader.Close(); err != nil {
logger.Errorw("error closing kafka reader", "error", err)
}
}()
}

func (c *consumer) handleUpdateGamePlayerDataMessage(ctx context.Context, _ *kafka.Message, uncast proto.Message) {
msg := uncast.(*pbmsg.UpdateGamePlayerDataMessage)

pId, err := uuid.Parse(msg.PlayerId)
if err != nil {
c.logger.Errorw("failed to parse player id", "error", err)
return
}

switch msg.GameMode {
case pbmodel.GameDataGameMode_BLOCK_SUMO:
err = c.handleBlockSumoUpdate(ctx, pId, msg)
case pbmodel.GameDataGameMode_MARATHON:
err = c.handleMarathonUpdate(ctx, pId, msg)

default:
c.logger.Errorw("unsupported game mode", "gameMode", msg.GameMode)
}

if err != nil {
c.logger.Errorw("failed to handle update", "error", err, "playerId", pId, "gameMode", msg.GameMode)
return
}
}

func (c *consumer) handleMarathonUpdate(ctx context.Context, pId uuid.UUID, msg *pbmsg.UpdateGamePlayerDataMessage) error {
gameData, err := c.repos.Marathon.GetOrDefault(ctx, pId, &model.MarathonData{BaseGameData: model.BaseGameData{PlayerId: pId}})
if err != nil {
return fmt.Errorf("failed to get block sumo data: %w", err)

}

msgData := &pbmodel.V1MarathonData{}

if err := anypb.UnmarshalTo(msg.Data, msgData, proto.UnmarshalOptions{}); err != nil {
return fmt.Errorf("failed to unmarshal data: %w", err)
}

for _, path := range msg.DataMask.Paths {
switch path {
case "block_palette":
gameData.BlockPalette = msgData.BlockPalette
case "time":
gameData.Time = msgData.Time
case "animation":
gameData.Animation = *msgData.Animation
}
}

return c.repos.Marathon.Save(ctx, gameData)
}

func (c *consumer) handleBlockSumoUpdate(ctx context.Context, pId uuid.UUID, msg *pbmsg.UpdateGamePlayerDataMessage) error {
gameData, err := c.repos.BlockSumo.GetOrDefault(ctx, pId, &model.BlockSumoData{BaseGameData: model.BaseGameData{PlayerId: pId}})
if err != nil {
return fmt.Errorf("failed to get block sumo data: %w", err)
}

msgData := &pbmodel.V1BlockSumoPlayerData{}

if err := anypb.UnmarshalTo(msg.Data, msgData, proto.UnmarshalOptions{}); err != nil {
return fmt.Errorf("failed to unmarshal data: %w", err)
}

for _, path := range msg.DataMask.Paths {
switch path {
case "block_slot":
gameData.BlockSlot = msgData.BlockSlot
case "shears_slot":
gameData.ShearsSlot = msgData.ShearsSlot
}
}

return c.repos.BlockSumo.Save(ctx, gameData)
}
Loading

0 comments on commit 99cfd28

Please sign in to comment.