Skip to content

Commit

Permalink
Merge pull request apache#1189 from walterlife/eventmesh-catalog
Browse files Browse the repository at this point in the history
[ISSUE apache#1090] Support for managing EventMesh events using AsyncAPI
  • Loading branch information
qqeasonchen authored Aug 30, 2022
2 parents ccfc060 + 42e8f7c commit 99facc2
Show file tree
Hide file tree
Showing 25 changed files with 3,122 additions and 0 deletions.
99 changes: 99 additions & 0 deletions eventmesh-catalog-go/.golangci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

linters-settings:
funlen:
lines: 80
statements: 80
goconst:
min-len: 2
min-occurrences: 3
gocyclo:
min-complexity: 20
golint:
min-confidence: 0
govet:
check-shadowing: true
lll:
line-length: 120
errcheck:
check-type-assertions: false
gocritic:
enabled-checks:
- nestingReduce
settings:
nestingReduce:
bodyWidth: 5

linters:
disable-all: true
enable:
- deadcode
- funlen
- goconst
- gocyclo
- gofmt
- ineffassign
- staticcheck
- structcheck
- goimports
- revive
- gosimple
- govet
- typecheck
- lll
- rowserrcheck
- errcheck
- unused
- varcheck
- sqlclosecheck

run:
timeout: 20m

issues:
exclude-use-default: true

# The list of ids of default excludes to include or disable. By default it's empty.
include:
- EXC0004 # govet (possible misuse of unsafe.Pointer|should have signature)
- EXC0005 # staticcheck ineffective break statement. Did you mean to break out of the outer loop
- EXC0011 # stylecheck (comment on exported (method|function|type|const)|should have( a package)? comment|comment should be of the form)
- EXC0012 # revive exported (method|function|type|const) (.+) should have comment or be unexported

exclude-rules:
- path: _test\.go
linters:
- funlen
- linters:
- staticcheck
text: "SA6002: argument should be pointer-like to avoid allocations" # sync.pool.Put(buf), slice `var buf []byte` will tiger this
- linters:
- structcheck
text: "Store` is unused"
- linters:
- lll
source: "^//go:generate "

max-same-issues: 0
new: false
max-issues-per-linter: 0

output:
sort-results: true

service:
golangci-lint-version: 1.23.x
40 changes: 40 additions & 0 deletions eventmesh-catalog-go/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

lint:
golangci-lint run --tests=false
build:
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o ${SERVER}
# goimports
# -e report all errors (not just the first 10 on different lines)
# -d display diffs instead of rewriting files
# -local put imports beginning with this string after 3rd-party packages; comma-separated list
# gofmt
# -e report all errors (not just the first 10 on different lines)
# -d display diffs instead of rewriting files
# -s simplify code
# -w write result to (source) file instead of stdout
fmt:
find . -name "*.go" | xargs goimports -e -d -local git.code.oa.com -w && \
find . -name "*.go" | xargs gofmt -e -d -s -w
test:
go test -v ./... -gcflags "all=-N -l"

cover:
go test ./... -gcflags "all=-N -l" --covermode=count -coverprofile=cover.out.tmp
cat cover.out.tmp | grep -v "_mock.go" | grep -v ".pb.go" > cover.out
rm cover.out.tmp
go tool cover -html=cover.out
112 changes: 112 additions & 0 deletions eventmesh-catalog-go/api/catalog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import (
"context"
"fmt"
"github.com/apache/incubator-eventmesh/eventmesh-catalog-go/api/proto"
"github.com/apache/incubator-eventmesh/eventmesh-catalog-go/internal/constants"
"github.com/apache/incubator-eventmesh/eventmesh-catalog-go/internal/dal"
"github.com/apache/incubator-eventmesh/eventmesh-catalog-go/internal/dal/model"
"github.com/apache/incubator-eventmesh/eventmesh-catalog-go/internal/util"
"github.com/apache/incubator-eventmesh/eventmesh-catalog-go/pkg/asyncapi"
v2 "github.com/apache/incubator-eventmesh/eventmesh-catalog-go/pkg/asyncapi/v2"
"github.com/gogf/gf/util/gconv"
"gorm.io/gorm"
"time"
)

type catalogImpl struct {
proto.UnimplementedCatalogServer
catalogDAL dal.CatalogDAL
}

func NewCatalogImpl() proto.CatalogServer {
var c catalogImpl
c.catalogDAL = dal.NewCatalogDAL()
return &c
}

func (c *catalogImpl) Registry(ctx context.Context, request *proto.RegistryRequest) (*proto.RegistryResponse, error) {
var rsp = &proto.RegistryResponse{}
if len(request.Definition) == 0 {
return rsp, nil
}
var doc = new(v2.Document)
if err := v2.Decode(gconv.Bytes(request.Definition), &doc); err != nil {
return rsp, err
}
if len(doc.Channels()) == 0 {
return rsp, nil
}
if err := dal.GetDalClient().Transaction(func(tx *gorm.DB) error {
var handlers []func() error
for _, channel := range doc.Channels() {
for _, operation := range channel.Operations() {
var record = c.buildEventCatalog(request.FileName, channel, operation)
handlers = append(handlers, func() error {
return c.catalogDAL.Insert(context.Background(), tx, record)
})
}
}
handlers = append(handlers, func() error {
return c.catalogDAL.InsertEvent(context.Background(), tx, c.buildEvent(request.FileName,
request.Definition, doc))
})
return util.GoAndWait(handlers...)
}); err != nil {
return rsp, err
}
return rsp, nil
}

func (c *catalogImpl) Query(ctx context.Context, in *proto.QueryRequest) (*proto.QueryResponse, error) {
var rsp = &proto.QueryResponse{}
res, err := c.catalogDAL.Select(ctx, in.OperationId)
if err != nil {
return rsp, err
}
rsp.Type = res.Type
rsp.ChannelName = res.ChannelName
rsp.Schema = res.Schema
return rsp, nil
}

func (c *catalogImpl) buildEventCatalog(fileName string, channel asyncapi.Channel,
operation asyncapi.Operation) *model.EventCatalog {
var record model.EventCatalog
record.OperationID = fmt.Sprintf("file://%s#%s", fileName, operation.ID())
record.ChannelName = channel.ID()
record.Type = string(operation.Type())
record.Status = constants.NormalStatus
record.CreateTime = time.Now()
record.UpdateTime = time.Now()
record.Schema = gconv.String(operation.Messages()[0].Payload())
return &record
}

func (c *catalogImpl) buildEvent(fileName string, definition string, doc asyncapi.Document) *model.Event {
var record model.Event
record.Title = doc.Info().Title()
record.FileName = fileName
record.Definition = definition
record.Status = constants.NormalStatus
record.Version = doc.Info().Version()
record.CreateTime = time.Now()
record.UpdateTime = time.Now()
return &record
}
Loading

0 comments on commit 99facc2

Please sign in to comment.