Skip to content

Commit

Permalink
chore: updated and fix proto file generation
Browse files Browse the repository at this point in the history
  • Loading branch information
hitesh22rana committed Dec 23, 2024
1 parent 0be2979 commit 41e88f4
Show file tree
Hide file tree
Showing 23 changed files with 753 additions and 850 deletions.
599 changes: 599 additions & 0 deletions .gen/go/mq/mq.pb.go

Large diffs are not rendered by default.

33 changes: 16 additions & 17 deletions pkg/proto/broker/broker_grpc.pb.go → .gen/go/mq/mq_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions .github/workflows/docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ jobs:
packages: write
steps:
- name: Check out repository
uses: actions/checkout@v3
uses: actions/checkout@v4

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
uses: docker/setup-buildx-action@v3

- name: Log in to GitHub Packages
uses: docker/login-action@v2
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.actor }}
Expand Down
6 changes: 3 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ RUN go mod download
COPY . .

# Compile the protocol buffer files and generate the Go files
RUN rm -rf pkg/proto && mkdir -p pkg/proto && \
RUN rm -rf .gen/ && mkdir -p .gen/go && \
for file in proto/*.proto; do \
base=$(basename $file); \
name=${base%.*}; \
mkdir -p pkg/proto/$name; \
protoc --go_out=paths=source_relative:./pkg/proto/$name --go-grpc_out=paths=source_relative:./pkg/proto/$name \
mkdir -p .gen/go/$name; \
protoc --go_out=paths=source_relative:.gen/go/$name --go-grpc_out=paths=source_relative:.gen/go/$name \
--proto_path=proto $file; \
done

Expand Down
24 changes: 12 additions & 12 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,30 @@

SHELL := /bin/bash -o pipefail

.PHONY: dependencies
dependencies:
@go mod tidy

.PHONY: generate-proto
generate-proto: dependencies
@rm -rf pkg/proto && mkdir -p pkg/proto && for file in proto/*.proto; do \
.PHONY: generate-proto-go
generate-proto-go:
@rm -rf .gen/ && mkdir -p .gen/go && for file in proto/*.proto; do \
base=$$(basename $$file); \
name=$${base%.*}; \
mkdir -p pkg/proto/$$name; \
protoc --go_out=paths=source_relative:./pkg/proto/$$name --go-grpc_out=paths=source_relative:./pkg/proto/$$name \
mkdir -p .gen/go/$$name; \
protoc --go_out=paths=source_relative:.gen/go/$$name --go-grpc_out=paths=source_relative:.gen/go/$$name \
--proto_path=proto $$file; \
done

.PHONY: dependencies
dependencies: generate-proto-go
@go mod tidy

.PHONY: build-broker
build-broker: generate-proto
build-broker: dependencies
@go build -o bin/broker cmd/broker/main.go

.PHONY: build-publisher
build-publisher: generate-proto
build-publisher: dependencies
@go build -o bin/publisher cmd/publisher/main.go

.PHONY: build-subscriber
build-subscriber: generate-proto
build-subscriber: dependencies
@go build -o bin/subscriber cmd/subscriber/main.go

.PHONY: build-all
Expand Down
Binary file modified bin/broker
Binary file not shown.
Binary file modified bin/publisher
Binary file not shown.
Binary file modified bin/subscriber
Binary file not shown.
8 changes: 4 additions & 4 deletions cmd/publisher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ import (
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"

pb "github.com/hitesh22rana/mq/.gen/go/mq"
"github.com/hitesh22rana/mq/internal/config"
"github.com/hitesh22rana/mq/internal/logger"
"github.com/hitesh22rana/mq/pkg/proto/broker"
)

func main() {
Expand Down Expand Up @@ -54,7 +54,7 @@ func main() {
}
defer conn.Close()

client := broker.NewBrokerServiceClient(conn)
client := pb.NewBrokerServiceClient(conn)
log.Info("info: publisher client started successfully")

// Create a new stream
Expand All @@ -67,7 +67,7 @@ func main() {
// Create a new channel
if _, err = client.CreateChannel(
context.Background(),
&broker.CreateChannelRequest{
&pb.CreateChannelRequest{
Channel: channel,
},
); err != nil {
Expand All @@ -91,7 +91,7 @@ func main() {
content = content[:len(content)-1] // Remove the newline character
_, err := client.Publish(
context.Background(),
&broker.PublishRequest{
&pb.PublishRequest{
Channel: channel,
Content: content,
},
Expand Down
13 changes: 6 additions & 7 deletions cmd/subscriber/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ import (
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"

pb "github.com/hitesh22rana/mq/.gen/go/mq"
"github.com/hitesh22rana/mq/internal/config"
"github.com/hitesh22rana/mq/internal/logger"
"github.com/hitesh22rana/mq/pkg/proto/broker"
"github.com/hitesh22rana/mq/pkg/proto/event"
)

func main() {
Expand Down Expand Up @@ -51,7 +50,7 @@ func main() {
)
}
defer conn.Close()
client := broker.NewBrokerServiceClient(conn)
client := pb.NewBrokerServiceClient(conn)
log.Info("info: subscriber client started successfully")

// Create a new stream
Expand All @@ -65,11 +64,11 @@ func main() {
startOffset, _ := reader.ReadString('\n')
startOffset = strings.Replace(startOffset, "\n", "", -1)

var offset event.Offset = 1
var offset pb.Offset = 1
if startOffset == "0" {
offset = event.Offset_OFFSET_BEGINNING
offset = pb.Offset_OFFSET_BEGINNING
} else if startOffset == "1" {
offset = event.Offset_OFFSET_LATEST
offset = pb.Offset_OFFSET_LATEST
} else {
log.Fatal("fatal: invalid offset")
}
Expand All @@ -79,7 +78,7 @@ func main() {
defer cancel()

// Subscribe to the channel
stream, err := client.Subscribe(ctx, &broker.SubscribeRequest{
stream, err := client.Subscribe(ctx, &pb.SubscribeRequest{
Channel: channel,
Offset: offset,
PullInterval: cfg.Subscriber.SubscriberDataPullingInterval,
Expand Down
3 changes: 2 additions & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ services:
container_name: mq

volumes:
data:
data:
external: true
12 changes: 6 additions & 6 deletions pkg/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

"go.uber.org/zap"

"github.com/hitesh22rana/mq/pkg/proto/event"
pb "github.com/hitesh22rana/mq/.gen/go/mq"
"github.com/hitesh22rana/mq/pkg/storage"
"github.com/hitesh22rana/mq/pkg/utils"
)
Expand All @@ -34,9 +34,9 @@ var (
// Broker defines the interface for the message broker
type Broker interface {
createChannel(context.Context, channel) error
publish(context.Context, channel, *event.Message) error
subscribe(context.Context, *event.Subscriber, event.Offset, uint64, channel, chan<- *event.Message) error
unsubscribe(context.Context, *event.Subscriber, channel) error
publish(context.Context, channel, *pb.Message) error
subscribe(context.Context, *pb.Subscriber, pb.Offset, uint64, channel, chan<- *pb.Message) error
unsubscribe(context.Context, *pb.Subscriber, channel) error
}

// Channel represents a message channel
Expand All @@ -47,7 +47,7 @@ type Service struct {
mu sync.RWMutex
logger *zap.Logger
storage storage.Storage
channelToSubscribers map[channel]map[*event.Subscriber]struct{}
channelToSubscribers map[channel]map[*pb.Subscriber]struct{}
}

// ServiceOptions represents the options for the broker service
Expand All @@ -61,7 +61,7 @@ func NewService(logger *zap.Logger, options *ServiceOptions) *Service {
mu: sync.RWMutex{},
logger: logger,
storage: options.Storage,
channelToSubscribers: make(map[channel]map[*event.Subscriber]struct{}),
channelToSubscribers: make(map[channel]map[*pb.Subscriber]struct{}),
}
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/broker/create_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/hitesh22rana/mq/pkg/proto/broker"
pb "github.com/hitesh22rana/mq/.gen/go/mq"
)

// createChannel creates a new channel, if it doesn't already exist else joins the existing channel
Expand Down Expand Up @@ -51,7 +51,7 @@ type createChannelInput struct {
}

// gRPC implementation of the CreateChannel method
func (s *Server) CreateChannel(ctx context.Context, req *broker.CreateChannelRequest) (*broker.CreateChannelResponse, error) {
func (s *Server) CreateChannel(ctx context.Context, req *pb.CreateChannelRequest) (*pb.CreateChannelResponse, error) {
input := &createChannelInput{
channel: req.GetChannel(),
}
Expand All @@ -66,5 +66,5 @@ func (s *Server) CreateChannel(ctx context.Context, req *broker.CreateChannelReq
return nil, status.Error(codes.Internal, "unable to create channel")
}

return &broker.CreateChannelResponse{}, nil
return &pb.CreateChannelResponse{}, nil
}
12 changes: 6 additions & 6 deletions pkg/broker/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"google.golang.org/grpc/peer"
"google.golang.org/grpc/reflection"

"github.com/hitesh22rana/mq/pkg/proto/broker"
pb "github.com/hitesh22rana/mq/.gen/go/mq"
)

type contextKey string
Expand All @@ -18,7 +18,7 @@ const IPContextKey contextKey = "ip"

// GrpcServer is the gRPC server
type GrpcServer struct {
broker.UnimplementedBrokerServiceServer
pb.UnimplementedBrokerServiceServer
server *Server
}

Expand All @@ -35,7 +35,7 @@ func NewGrpcServer(options *GrpcServerOptions) *grpc.Server {
grpc.UnaryInterceptor(UnaryIPInterceptor),
grpc.MaxRecvMsgSize(options.MaxRecvMsgSize),
)
broker.RegisterBrokerServiceServer(
pb.RegisterBrokerServiceServer(
s,
&GrpcServer{server: options.Server},
)
Expand All @@ -53,16 +53,16 @@ func UnaryIPInterceptor(ctx context.Context, req interface{}, info *grpc.UnarySe
}

// CreateChannel gRPC endpoint
func (gRPC *GrpcServer) CreateChannel(ctx context.Context, req *broker.CreateChannelRequest) (*broker.CreateChannelResponse, error) {
func (gRPC *GrpcServer) CreateChannel(ctx context.Context, req *pb.CreateChannelRequest) (*pb.CreateChannelResponse, error) {
return gRPC.server.CreateChannel(ctx, req)
}

// Publish gRPC endpoint
func (gRPC *GrpcServer) Publish(ctx context.Context, req *broker.PublishRequest) (*broker.PublishResponse, error) {
func (gRPC *GrpcServer) Publish(ctx context.Context, req *pb.PublishRequest) (*pb.PublishResponse, error) {
return gRPC.server.Publish(ctx, req)
}

// Subscribe gRPC endpoint
func (gRPC *GrpcServer) Subscribe(req *broker.SubscribeRequest, stream broker.BrokerService_SubscribeServer) error {
func (gRPC *GrpcServer) Subscribe(req *pb.SubscribeRequest, stream pb.BrokerService_SubscribeServer) error {
return gRPC.server.Subscribe(req, stream)
}
Loading

0 comments on commit 41e88f4

Please sign in to comment.