Skip to content

Commit

Permalink
Merge pull request #21 from brunograssano/separar_ex4
Browse files Browse the repository at this point in the history
[EDIT] Refactor Ex4 - 4 services
  • Loading branch information
Bata340 authored Oct 20, 2023
2 parents 45bee5c + e6352f4 commit 824b93b
Show file tree
Hide file tree
Showing 42 changed files with 1,331 additions and 246 deletions.
10 changes: 8 additions & 2 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ jobs:
go build -v ./simple_saver/...
go build -v ./server/...
go build -v ./client/...
go build -v -o ex4 ./ex4/...
go build -v -o ex4_sink ./ex4_sink/...
go build -v -o avg_calculator_ex4 ./avg_calculator_ex4/...
go build -v -o dispatcher_ex4 ./dispatcher_ex4/...
go build -v -o ex4_journey_saver ./ex4_journey_saver/...
go build -v ./saver_ex_3/...
- name: Test
Expand All @@ -46,5 +49,8 @@ jobs:
go test -v ./simple_saver/...
go test -v ./server/...
go test -v ./client/...
go test -v ./ex4/...
go test -v ./ex4_sink/...
go test -v ./avg_calculator_ex4/...
go test -v ./dispatcher_ex4/...
go test -v ./ex4_journey_saver/...
go test -v ./saver_ex_3/...
15 changes: 12 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ build:
go build -v ./simple_saver/...
go build -v ./server/...
go build -v ./client/...
go build -v ./ex4/...
go build -v ./ex4_sink/...
go build -v ./avg_calculator_ex4/...
go build -v ./dispatcher_ex4/...
go build -v ./ex4_journey_saver/...
go build -v ./saver_ex_3/...
.PHONY: build

Expand All @@ -29,7 +32,10 @@ test:
go test -v ./simple_saver/...
go test -v ./server/...
go test -v ./client/...
go test -v ./ex4/...
go test -v ./ex4_sink/...
go test -v ./avg_calculator_ex4/...
go test -v ./dispatcher_ex4/...
go test -v ./ex4_journey_saver/...
go test -v ./saver_ex_3/...
.PHONY: test

Expand All @@ -42,7 +48,10 @@ docker-image:
docker build -f ./simple_saver/Dockerfile -t "simple_saver:latest" .
docker build -f ./server/Dockerfile -t "server:latest" .
docker build -f ./client/Dockerfile -t "client:latest" .
docker build -f ./ex4/Dockerfile -t "ex4_handler:latest" .
docker build -f ./ex4_journey_saver/Dockerfile -t "ex4_journey_saver:latest" .
docker build -f ./ex4_sink/Dockerfile -t "ex4_sink:latest" .
docker build -f ./dispatcher_ex4/Dockerfile -t "dispatcher_ex4:latest" .
docker build -f ./avg_calculator_ex4/Dockerfile -t "avg_calculator_ex4:latest" .
docker build -f ./saver_ex_3/Dockerfile -t "saver_ex_3:latest" .
# Execute this command from time to time to clean up intermediate stages generated
# during client build (your hard drive will like this :) ). Don't left uncommented if you
Expand Down
19 changes: 19 additions & 0 deletions avg_calculator_ex4/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
FROM golang:1.21 AS builder
# Client uses docker multistage builds feature https://docs.docker.com/develop/develop-images/multistage-build/
# First stage is used to compile golang binary and second stage is used to only copy the
# binary generated to the deploy image.
# Docker multi stage does not delete intermediate stages used to build our image, so we need
# to delete it by ourselves. Since docker does not give a good alternative to delete the intermediate images
# we are adding a very specific label to the image to then find these kind of images and delete them
LABEL intermediateStageToBeDeleted=true

RUN mkdir -p /build
WORKDIR /build/
COPY .. .
# CGO_ENABLED must be disabled to run go binary in Alpine
RUN CGO_ENABLED=0 GOOS=linux go build -o bin/avg_calculator_ex4 avg_calculator_ex4


FROM busybox:latest
COPY --from=builder /build/bin/avg_calculator_ex4 /avg_calculator_ex4
ENTRYPOINT ["/avg_calculator_ex4"]
47 changes: 23 additions & 24 deletions ex4/ex4config.go → avg_calculator_ex4/avg_calculator_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,17 @@ import (
"strings"

"github.com/brunograssano/Distribuidos-TP1/common/config"
"github.com/brunograssano/Distribuidos-TP1/common/utils"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
)

// Ex4Config The configuration of the application
type Ex4Config struct {
ID string
InputQueueName string
OutputQueueName string
RabbitAddress string
InternalSaversCount uint
// AvgCalculatorConfig The configuration of the application
type AvgCalculatorConfig struct {
ID string
InputQueueName string
OutputQueueName string
RabbitAddress string
SaversCount uint
}

// InitEnv Initializes the configuration properties from a config file and environment
Expand All @@ -33,18 +32,18 @@ func InitEnv() (*viper.Viper, error) {
_ = v.BindEnv("rabbitmq", "address")
_ = v.BindEnv("rabbitmq", "queue", "input")
_ = v.BindEnv("rabbitmq", "queue", "output")
_ = v.BindEnv("internal", "savers", "count")
_ = v.BindEnv("savers", "count")

v.SetConfigFile("./config.yaml")
if err := v.ReadInConfig(); err != nil {
log.Warnf("Ex4Config | Warning Message | Configuration could not be read from config file. Using env variables instead")
log.Warnf("AvgCalculatorConfig | Warning Message | Configuration could not be read from config file. Using env variables instead")
}

return v, nil
}

// GetConfig Validates and returns the configuration of the application
func GetConfig(env *viper.Viper) (*Ex4Config, error) {
func GetConfig(env *viper.Viper) (*AvgCalculatorConfig, error) {
if err := config.InitLogger(env.GetString("log.level")); err != nil {
return nil, err
}
Expand All @@ -69,28 +68,28 @@ func GetConfig(env *viper.Viper) (*Ex4Config, error) {
return nil, errors.New("missing rabbitmq address")
}

internalSaversCount := env.GetUint("internal.savers.count")
if internalSaversCount <= 0 || internalSaversCount > utils.MaxGoroutines {
log.Warnf("Ex4Config | Not a valid value '%v' for internal savers count, using default", internalSaversCount)
internalSaversCount = utils.DefaultGoroutines
saversCount := env.GetUint("savers.count")
if saversCount <= 0 {
return nil, errors.New("invalid savers count")
}

if err := config.InitLogger(env.GetString("log.level")); err != nil {
return nil, err
}

log.Infof("Ex4Config | action: config | result: success | id: %s | log_level: %s | rabbitAddress: %v | inputQueueName: %v | internalSaversCount: %v",
log.Infof("AvgCalculatorConfig | action: config | result: success | id: %s | log_level: %s | rabbitAddress: %v | inputQueueName: %v | outputQueueName: %v | saversCount: %v",
id,
env.GetString("log.level"),
rabbitAddress,
inputQueueName,
internalSaversCount)

return &Ex4Config{
ID: id,
InputQueueName: inputQueueName,
OutputQueueName: outputQueueName,
RabbitAddress: rabbitAddress,
InternalSaversCount: internalSaversCount,
outputQueueName,
saversCount)

return &AvgCalculatorConfig{
ID: id,
InputQueueName: inputQueueName,
OutputQueueName: outputQueueName,
RabbitAddress: rabbitAddress,
SaversCount: saversCount,
}, nil
}
16 changes: 9 additions & 7 deletions ex4/avgcalculator.go → avg_calculator_ex4/avgcalculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ import (
)

type AvgCalculator struct {
toInternalSaversChannels []queueProtocol.ProducerProtocolInterface
pricesConsumer queueProtocol.ConsumerProtocolInterface
toJourneySavers []queueProtocol.ProducerProtocolInterface
pricesConsumer queueProtocol.ConsumerProtocolInterface
c *AvgCalculatorConfig
}

func NewAvgCalculator(toInternalSaversChannels []queueProtocol.ProducerProtocolInterface, pricesConsumer queueProtocol.ConsumerProtocolInterface) *AvgCalculator {
return &AvgCalculator{toInternalSaversChannels: toInternalSaversChannels, pricesConsumer: pricesConsumer}
func NewAvgCalculator(toJourneySavers []queueProtocol.ProducerProtocolInterface, pricesConsumer queueProtocol.ConsumerProtocolInterface, c *AvgCalculatorConfig) *AvgCalculator {
return &AvgCalculator{toJourneySavers: toJourneySavers, pricesConsumer: pricesConsumer, c: c}
}

// CalculateAvgLoop Waits for the final results from the journey savers,
Expand All @@ -24,8 +25,9 @@ func (a *AvgCalculator) CalculateAvgLoop() {
for {
sumOfPrices := float32(0)
sumOfRows := 0
log.Infof("AvgCalculator | Starting await of internalSavers | Now waiting for %v savers...", len(a.toInternalSaversChannels))
for sentResults := 0; sentResults < len(a.toInternalSaversChannels); sentResults++ {
log.Infof("AvgCalculator | Starting await of internalSavers | Now waiting for %v savers...", len(a.toJourneySavers))

for sentResults := uint(0); sentResults < a.c.SaversCount; sentResults++ {
msg, ok := a.pricesConsumer.Pop()
if !ok {
log.Errorf("AvgCalculator | Consumer closed when not expected, exiting average calculator")
Expand Down Expand Up @@ -68,7 +70,7 @@ func (a *AvgCalculator) sendToJourneySavers(avg float32) {
dynMap[utils.FinalAvg] = avgBytes
data := []*dataStructure.DynamicMap{dataStructure.NewDynamicMap(dynMap)}
msg := &dataStructure.Message{TypeMessage: dataStructure.FinalAvgMsg, DynMaps: data}
for i, channel := range a.toInternalSaversChannels {
for i, channel := range a.toJourneySavers {
log.Infof("AvgCalculator | Sending average to saver %v", i)
err := channel.Send(msg)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestShouldReturnZeroIfTheSumIsZero(t *testing.T) {
func TestShouldSendTheAverageToTheConsumers(t *testing.T) {
chan1 := make(chan *dataStructures.Message, 1)
chan2 := make(chan *dataStructures.Message, 1)
avgCalculator := &AvgCalculator{toInternalSaversChannels: []queueProtocol.ProducerProtocolInterface{queueProtocol.NewProducerChannel(chan1), queueProtocol.NewProducerChannel(chan2)}}
avgCalculator := &AvgCalculator{toJourneySavers: []queueProtocol.ProducerProtocolInterface{queueProtocol.NewProducerChannel(chan1), queueProtocol.NewProducerChannel(chan2)}}

go avgCalculator.sendToJourneySavers(5)

Expand Down
3 changes: 0 additions & 3 deletions ex4/config.yaml → avg_calculator_ex4/config.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@

log:
level: "info"
internal:
savers:
count: 6
rabbitmq:
address: "amqp://guest:guest@rabbitmq:5672/"
3 changes: 3 additions & 0 deletions avg_calculator_ex4/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module avg_calculator_ex4

go 1.21
34 changes: 34 additions & 0 deletions avg_calculator_ex4/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package main

import (
"fmt"
"github.com/brunograssano/Distribuidos-TP1/common/middleware"
queueProtocol "github.com/brunograssano/Distribuidos-TP1/common/protocol/queues"
"github.com/brunograssano/Distribuidos-TP1/common/utils"
log "github.com/sirupsen/logrus"
)

func main() {
sigs := utils.CreateSignalListener()
env, err := InitEnv()
if err != nil {
log.Fatalf("Main - Ex4 Avg Calculator | Error initializing env | %s", err)
}
config, err := GetConfig(env)
if err != nil {
log.Fatalf("Main - Ex4 Avg Calculator | Error initializing Config | %s", err)
}

qMiddleware := middleware.NewQueueMiddleware(config.RabbitAddress)
inputQueue := queueProtocol.NewConsumerQueueProtocolHandler(qMiddleware.CreateConsumer(config.InputQueueName, true))
var toJourneySavers []queueProtocol.ProducerProtocolInterface
for i := uint(0); i < config.SaversCount; i++ {
producer := qMiddleware.CreateExchangeProducer(config.OutputQueueName, fmt.Sprintf("%v", i), "direct", true)
toJourneySavers = append(toJourneySavers, queueProtocol.NewProducerQueueProtocolHandler(producer))
}

avgCalculator := NewAvgCalculator(toJourneySavers, inputQueue, config)
go avgCalculator.CalculateAvgLoop()
<-sigs
qMiddleware.Close()
}
4 changes: 2 additions & 2 deletions common/middleware/consumermiddleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ func (queue *Consumer) Pop() ([]byte, bool) {
return msg.Body, ok
}

func (queue *Consumer) BindTo(nameExchange string, routingKey string) error {
func (queue *Consumer) BindTo(nameExchange string, routingKey string, kind string) error {
err := queue.rabbitMQChannel.ExchangeDeclare(
nameExchange,
"fanout",
kind,
true,
false,
false,
Expand Down
2 changes: 1 addition & 1 deletion common/middleware/queueinterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ type ProducerInterface interface {

type ConsumerInterface interface {
Pop() ([]byte, bool)
BindTo(nameExchange string, routingKey string) error
BindTo(nameExchange string, routingKey string, kind string) error
}
2 changes: 1 addition & 1 deletion common/protocol/queues/eofhandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (m *mockConsumerQueueProtocolHandler) Pop() (*dataStructures.Message, bool)
return msg, ok
}

func (m *mockConsumerQueueProtocolHandler) BindTo(_ string, _ string) error {
func (m *mockConsumerQueueProtocolHandler) BindTo(_ string, _ string, _ string) error {
return nil
}

Expand Down
10 changes: 9 additions & 1 deletion compose-generator/compose_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ def generate_compose():
processors = 0
distances = 0
completers = 0
ex4Savers = 0
ex4Dispatchers = 0

while reducers1 < 1:
reducers1 = int(input("How many reducers for ex1? "))
Expand All @@ -26,13 +28,19 @@ def generate_compose():
while completers < 1:
completers = int(input("How many distance completers? "))

while ex4Savers < 1:
ex4Savers = int(input("How many journey savers for ex4? "))

while ex4Dispatchers < 1:
ex4Dispatchers = int(input("How many dispatchers for ex4? "))

env = Environment(loader=FileSystemLoader("templates/"))

template = env.get_template("docker-compose-template.yaml")

filename = f"generated/docker-compose.yaml"
with open(filename, mode="w", encoding="utf-8") as output:
output.write(template.render(reducers1=reducers1, reducers2=reducers2, stopovers=stopovers, processors=processors, distances=distances, completers=completers))
output.write(template.render(reducers1=reducers1, reducers2=reducers2, stopovers=stopovers, processors=processors, distances=distances, completers=completers, ex4Savers=ex4Savers, ex4Dispatchers=ex4Dispatchers))

print(f"Wrote docker compose to {filename}")

Expand Down
Loading

0 comments on commit 824b93b

Please sign in to comment.