From 7b5ba98fb51a0c72cac86503bc1f217ae28a1750 Mon Sep 17 00:00:00 2001 From: xiangwanpeng Date: Thu, 5 Jan 2023 18:08:20 +0800 Subject: [PATCH 1/6] feat: concatenate read-write process --- .gitignore | 3 +- internal/indexlib/base.go | 2 +- internal/indexlib/bluge/reader.go | 3 + internal/indexlib/manage/manage.go | 28 ++++---- internal/indexlib/manage/reader_test.go | 22 +++---- internal/indexlib/manage/writer_test.go | 26 ++++---- internal/indexlib/query_request.go | 6 ++ internal/ingestion/handler/ingest_handler.go | 25 +++++++ .../{ => handler}/ingest_handler_test.go | 6 +- internal/ingestion/ingest_doc.go | 50 ++++++++++++++ internal/ingestion/ingest_handler.go | 19 ------ internal/meta/handler/index_handler.go | 4 +- internal/meta/metadata/index.go | 8 +-- internal/meta/metadata/storage/boltdb.go | 6 +- internal/{meta => protocol}/index.go | 2 +- .../{ingestion => protocol}/ingest_request.go | 2 +- .../ingest_response.go | 2 +- internal/{query => protocol}/query_request.go | 8 +-- .../{query => protocol}/query_response.go | 4 +- internal/{meta => protocol}/shard.go | 2 +- internal/query/handler/query_handler.go | 30 +++++++++ .../query/{ => handler}/query_handler_test.go | 4 +- internal/query/query_doc.go | 66 +++++++++++++++++++ internal/query/query_handler.go | 19 ------ internal/service/router.go | 14 ++-- 25 files changed, 252 insertions(+), 109 deletions(-) create mode 100644 internal/ingestion/handler/ingest_handler.go rename internal/ingestion/{ => handler}/ingest_handler_test.go (77%) create mode 100644 internal/ingestion/ingest_doc.go delete mode 100644 internal/ingestion/ingest_handler.go rename internal/{meta => protocol}/index.go (98%) rename internal/{ingestion => protocol}/ingest_request.go (92%) rename internal/{ingestion => protocol}/ingest_response.go (89%) rename internal/{query => protocol}/query_request.go (88%) rename internal/{query => protocol}/query_response.go (90%) rename internal/{meta => protocol}/shard.go (96%) create mode 100644 internal/query/handler/query_handler.go rename internal/query/{ => handler}/query_handler_test.go (90%) create mode 100644 internal/query/query_doc.go delete mode 100644 internal/query/query_handler.go diff --git a/.gitignore b/.gitignore index ba80ceb..0693930 100644 --- a/.gitignore +++ b/.gitignore @@ -9,4 +9,5 @@ bin .vscode # tatris test outputs -**/_meta* \ No newline at end of file +**/_meta* +**/_data* \ No newline at end of file diff --git a/internal/indexlib/base.go b/internal/indexlib/base.go index 44a3573..eca50cc 100644 --- a/internal/indexlib/base.go +++ b/internal/indexlib/base.go @@ -11,7 +11,7 @@ const ( TypeField = "_type" BlugeIndexLibType = "bluge" - DefaultDataPath = "./data" + DefaultDataPath = "./_data" FSStorageType = "fs" ) diff --git a/internal/indexlib/bluge/reader.go b/internal/indexlib/bluge/reader.go index 7add97d..c838e36 100644 --- a/internal/indexlib/bluge/reader.go +++ b/internal/indexlib/bluge/reader.go @@ -65,6 +65,9 @@ func (b *BlugeReader) generateQuery(query indexlib.QueryRequest) bluge.Query { var blugeQuery bluge.Query switch query := query.(type) { + case *indexlib.MatchAllQuery: + q := bluge.NewMatchAllQuery() + blugeQuery = q case *indexlib.MatchQuery: q := bluge.NewMatchQuery(query.Match) if query.Field != "" { diff --git a/internal/indexlib/manage/manage.go b/internal/indexlib/manage/manage.go index 6374cf3..ec72107 100644 --- a/internal/indexlib/manage/manage.go +++ b/internal/indexlib/manage/manage.go @@ -4,6 +4,7 @@ package manage import ( + "errors" "github.com/tatris-io/tatris/internal/indexlib" "github.com/tatris-io/tatris/internal/indexlib/bluge" "log" @@ -18,15 +19,15 @@ func init() { writerPool = make(map[string]indexlib.Writer) } -func GetReader(config *indexlib.BaseConfig) indexlib.Reader { +func GetReader(config *indexlib.BaseConfig) (indexlib.Reader, error) { if config.Index == "" { - return nil + return nil, errors.New("no index specified") } baseConfig := indexlib.NewBaseConfig(config) key := getKey(baseConfig) if reader, found := readerPool[key]; found { - return reader + return reader, nil } switch baseConfig.IndexLibType { @@ -35,26 +36,24 @@ func GetReader(config *indexlib.BaseConfig) indexlib.Reader { err := blugeReader.OpenReader() if err != nil { log.Printf("bluge open reader error: %s", err) - return nil + return nil, err } readerPool[key] = blugeReader - return blugeReader + return blugeReader, nil default: - log.Printf("index lib not support") + return nil, errors.New("index lib not support") } - - return nil } -func GetWriter(config *indexlib.BaseConfig) indexlib.Writer { +func GetWriter(config *indexlib.BaseConfig) (indexlib.Writer, error) { if config.Index == "" { - return nil + return nil, errors.New("no index specified") } baseConfig := indexlib.NewBaseConfig(config) key := getKey(baseConfig) if writer, found := writerPool[key]; found { - return writer + return writer, nil } switch baseConfig.IndexLibType { @@ -63,15 +62,14 @@ func GetWriter(config *indexlib.BaseConfig) indexlib.Writer { err := blugeWriter.OpenWriter() if err != nil { log.Printf("bluge open writer error: %s", err) - return nil + return nil, err } writerPool[key] = blugeWriter - return blugeWriter + return blugeWriter, nil default: - log.Printf("index lib not support") + return nil, errors.New("index lib not support") } - return nil } func CloseReader(config *indexlib.BaseConfig) { diff --git a/internal/indexlib/manage/reader_test.go b/internal/indexlib/manage/reader_test.go index 2a90761..e942276 100644 --- a/internal/indexlib/manage/reader_test.go +++ b/internal/indexlib/manage/reader_test.go @@ -10,20 +10,20 @@ import ( func TestRead(t *testing.T) { config := &indexlib.BaseConfig{ - Index: "test", + Index: "storage_product", } - reader := GetReader(config) - if reader == nil { + reader, err := GetReader(config) + if err != nil { t.Log("get reader error!") t.FailNow() - } + } else { + matchQuery := &indexlib.MatchQuery{Match: "tatris", Field: "name"} + resp, err := reader.Search(context.Background(), matchQuery, -1) + if err != nil { + t.Log(err) + } + t.Log(resp) - matchQuery := &indexlib.MatchQuery{Match: "tatris", Field: "name"} - resp, err := reader.Search(context.Background(), matchQuery, -1) - if err != nil { - t.Log(err) + CloseReader(config) } - t.Log(resp) - - CloseReader(config) } diff --git a/internal/indexlib/manage/writer_test.go b/internal/indexlib/manage/writer_test.go index 1f8617c..4557201 100644 --- a/internal/indexlib/manage/writer_test.go +++ b/internal/indexlib/manage/writer_test.go @@ -9,23 +9,21 @@ import ( func TestWrite(t *testing.T) { config := &indexlib.BaseConfig{ - Index: "test", + Index: "storage_product", } - writer := GetWriter(config) - if writer == nil { + if writer, err := GetWriter(config); err != nil { t.Logf("get writer error!") t.FailNow() - } - - doc := make(map[string]interface{}) - doc["name"] = "tatris" - doc["describe"] = "Time-aware storage and search system" + } else { + doc := make(map[string]interface{}) + doc["name"] = "tatris" + doc["desc"] = "Time-aware storage and search system" + err := writer.Insert("storage_product", doc) + if err != nil { + t.Logf("error write index %v", err) + } + t.Log("Write success!") - err := writer.Insert("test", doc) - if err != nil { - t.Logf("error write index %v", err) + CloseWriter(config) } - t.Log("Write success!") - - CloseWriter(config) } diff --git a/internal/indexlib/query_request.go b/internal/indexlib/query_request.go index f07c837..3887eb2 100644 --- a/internal/indexlib/query_request.go +++ b/internal/indexlib/query_request.go @@ -6,6 +6,12 @@ type QueryRequest interface { searcher() } +type MatchAllQuery struct { +} + +func (m *MatchAllQuery) searcher() { +} + type MatchQuery struct { Match string Field string diff --git a/internal/ingestion/handler/ingest_handler.go b/internal/ingestion/handler/ingest_handler.go new file mode 100644 index 0000000..51aa672 --- /dev/null +++ b/internal/ingestion/handler/ingest_handler.go @@ -0,0 +1,25 @@ +// Copyright 2022 Tatris Project Authors. Licensed under Apache-2.0. + +// Package handler is responsible for handling HTTP requests about ingestion +package handler + +import ( + "github.com/gin-gonic/gin" + "github.com/tatris-io/tatris/internal/ingestion" + "github.com/tatris-io/tatris/internal/protocol" + "net/http" +) + +func IngestHandler(c *gin.Context) { + indexName := c.Param("index") + ingestRequest := protocol.IngestRequest{} + if err := c.ShouldBind(&ingestRequest); err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"msg": err.Error()}) + } + ingestRequest.Index = indexName + if err := ingestion.IngestDocs(indexName, ingestRequest.Documents); err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"msg": err.Error()}) + } else { + c.JSON(http.StatusOK, nil) + } +} diff --git a/internal/ingestion/ingest_handler_test.go b/internal/ingestion/handler/ingest_handler_test.go similarity index 77% rename from internal/ingestion/ingest_handler_test.go rename to internal/ingestion/handler/ingest_handler_test.go index 6dca9e4..7481ca4 100644 --- a/internal/ingestion/ingest_handler_test.go +++ b/internal/ingestion/handler/ingest_handler_test.go @@ -1,6 +1,6 @@ // Copyright 2022 Tatris Project Authors. Licensed under Apache-2.0. -package ingestion +package handler import ( "bytes" @@ -26,10 +26,10 @@ func TestIngestHandler(t *testing.T) { } c.Request = req p := gin.Params{} - p = append(p, gin.Param{Key: "index", Value: "index_1"}) + p = append(p, gin.Param{Key: "index", Value: "storage_product"}) c.Params = p c.Request.Header.Set("Content-Type", "application/json;charset=utf-8") - c.Request.Body = io.NopCloser(bytes.NewBufferString("{\"documents\":[{\"name\":\"Bob\",\"age\":12},{\"name\":\"Peter\",\"age\":20}]}")) + c.Request.Body = io.NopCloser(bytes.NewBufferString("{\"documents\":[{\"name\":\"tatris\",\"desc\":\"Time-aware storage and search system\"},{\"name\":\"mysql\",\"desc\":\"Relational database\"}]}")) IngestHandler(c) fmt.Println(w) assert.Equal(t, http.StatusOK, w.Code) diff --git a/internal/ingestion/ingest_doc.go b/internal/ingestion/ingest_doc.go new file mode 100644 index 0000000..6a80820 --- /dev/null +++ b/internal/ingestion/ingest_doc.go @@ -0,0 +1,50 @@ +// Copyright 2023 Tatris Project Authors. Licensed under Apache-2.0. + +package ingestion + +import ( + "crypto/rand" + "fmt" + "github.com/tatris-io/tatris/internal/indexlib" + "github.com/tatris-io/tatris/internal/indexlib/manage" + "log" + "os" +) + +var wd, _ = os.Getwd() + +// TODO: make it configurable +var dataPath = wd + "/../../../_data" + +func IngestDocs(idxName string, docs []map[string]interface{}) error { + config := &indexlib.BaseConfig{ + Index: idxName, + DataPath: dataPath, + } + writer, err := manage.GetWriter(config) + if err != nil { + return err + } + docsWithID := make(map[string]map[string]interface{}) + for _, doc := range docs { + docID := "" + if id, ok := doc["_id"]; ok && id != nil && id != "" { + docID = id.(string) + } else { + docID = generateID() + } + docsWithID[docID] = doc + } + return writer.Batch(docsWithID) +} + +// TODO: distributed ID +func generateID() string { + b := make([]byte, 16) + _, err := rand.Read(b) + if err != nil { + log.Fatal(err) + } + return fmt.Sprintf("%x-%x-%x-%x-%x", + b[0:4], b[4:6], b[6:8], b[8:10], b[10:]) +} diff --git a/internal/ingestion/ingest_handler.go b/internal/ingestion/ingest_handler.go deleted file mode 100644 index db7fbd0..0000000 --- a/internal/ingestion/ingest_handler.go +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright 2022 Tatris Project Authors. Licensed under Apache-2.0. - -package ingestion - -import ( - "github.com/gin-gonic/gin" - "net/http" -) - -func IngestHandler(c *gin.Context) { - indexName := c.Param("index") - ingestRequest := IngestRequest{} - if err := c.ShouldBind(&ingestRequest); err != nil { - c.String(http.StatusBadRequest, `invalid request`) - } - ingestRequest.Index = indexName - // TODO do ingestion... - c.JSON(http.StatusOK, ingestRequest) -} diff --git a/internal/meta/handler/index_handler.go b/internal/meta/handler/index_handler.go index 0bb4fe2..b703fe1 100644 --- a/internal/meta/handler/index_handler.go +++ b/internal/meta/handler/index_handler.go @@ -5,14 +5,14 @@ package handler import ( "github.com/gin-gonic/gin" - "github.com/tatris-io/tatris/internal/meta" "github.com/tatris-io/tatris/internal/meta/metadata" + "github.com/tatris-io/tatris/internal/protocol" "net/http" ) func CreateIndexHandler(c *gin.Context) { idxName := c.Param("index") - idx := meta.Index{} + idx := protocol.Index{} if err := c.ShouldBind(&idx); err != nil { c.JSON(http.StatusBadRequest, gin.H{"msg": "invalid request"}) } diff --git a/internal/meta/metadata/index.go b/internal/meta/metadata/index.go index 8afe987..da50f62 100644 --- a/internal/meta/metadata/index.go +++ b/internal/meta/metadata/index.go @@ -5,8 +5,8 @@ package metadata import ( json "encoding/json" - "github.com/tatris-io/tatris/internal/meta" "github.com/tatris-io/tatris/internal/meta/metadata/storage" + "github.com/tatris-io/tatris/internal/protocol" ) var metaStore storage.MetaStore @@ -15,7 +15,7 @@ func init() { metaStore, _ = storage.Open() } -func Create(idx *meta.Index) error { +func Create(idx *protocol.Index) error { json, err := json.Marshal(idx) if err != nil { return err @@ -23,13 +23,13 @@ func Create(idx *meta.Index) error { return metaStore.Set(fillKey(idx.Name), json) } -func Get(idxName string) (*meta.Index, error) { +func Get(idxName string) (*protocol.Index, error) { if b, err := metaStore.Get(fillKey(idxName)); err != nil { return nil, err } else if b == nil { return nil, nil } else { - idx := new(meta.Index) + idx := new(protocol.Index) if err := json.Unmarshal(b, idx); err != nil { return nil, err } diff --git a/internal/meta/metadata/storage/boltdb.go b/internal/meta/metadata/storage/boltdb.go index 35296a7..5749efd 100644 --- a/internal/meta/metadata/storage/boltdb.go +++ b/internal/meta/metadata/storage/boltdb.go @@ -11,6 +11,10 @@ import ( "github.com/boltdb/bolt" ) +const ( + BoltMetaPath = "./_meta.bolt" +) + type BoltMetaStore struct { db *bolt.DB } @@ -18,7 +22,7 @@ type BoltMetaStore struct { func Open() (*BoltMetaStore, error) { // Open the data file in your current directory. // It will be created if it doesn't exist. - db, err := bolt.Open("_meta.bolt", 0600, &bolt.Options{Timeout: 1 * time.Second}) + db, err := bolt.Open(BoltMetaPath, 0600, &bolt.Options{Timeout: 1 * time.Second}) if err != nil { return nil, err } diff --git a/internal/meta/index.go b/internal/protocol/index.go similarity index 98% rename from internal/meta/index.go rename to internal/protocol/index.go index 375422b..d8bcfd2 100644 --- a/internal/meta/index.go +++ b/internal/protocol/index.go @@ -1,6 +1,6 @@ // Copyright 2022 Tatris Project Authors. Licensed under Apache-2.0. -package meta +package protocol type Index struct { // index name diff --git a/internal/ingestion/ingest_request.go b/internal/protocol/ingest_request.go similarity index 92% rename from internal/ingestion/ingest_request.go rename to internal/protocol/ingest_request.go index a057373..15a9306 100644 --- a/internal/ingestion/ingest_request.go +++ b/internal/protocol/ingest_request.go @@ -1,6 +1,6 @@ // Copyright 2022 Tatris Project Authors. Licensed under Apache-2.0. -package ingestion +package protocol type IngestRequest struct { Index string `json:"index"` diff --git a/internal/ingestion/ingest_response.go b/internal/protocol/ingest_response.go similarity index 89% rename from internal/ingestion/ingest_response.go rename to internal/protocol/ingest_response.go index 8d79341..71281c4 100644 --- a/internal/ingestion/ingest_response.go +++ b/internal/protocol/ingest_response.go @@ -1,6 +1,6 @@ // Copyright 2022 Tatris Project Authors. Licensed under Apache-2.0. -package ingestion +package protocol type IngestResponse struct { Took int64 `json:"took"` diff --git a/internal/query/query_request.go b/internal/protocol/query_request.go similarity index 88% rename from internal/query/query_request.go rename to internal/protocol/query_request.go index f60464e..09b8f5a 100644 --- a/internal/query/query_request.go +++ b/internal/protocol/query_request.go @@ -1,8 +1,7 @@ -// Copyright 2022 Tatris Project Authors. Licensed under Apache-2.0. +// Copyright 2023 Tatris Project Authors. Licensed under Apache-2.0. -package query - -// TODO: too many query type to be defined +// Package protocol describes the core data structures and calling conventions of Tatris +package protocol type QueryRequest struct { Index string `json:"index"` @@ -10,6 +9,7 @@ type QueryRequest struct { Size int64 `json:"size"` } +// TODO: to be supplemented type Query struct { // "match_all": {} MatchAll *MatchAll `json:"match_all,omitempty"` diff --git a/internal/query/query_response.go b/internal/protocol/query_response.go similarity index 90% rename from internal/query/query_response.go rename to internal/protocol/query_response.go index aebf8b3..5a79a59 100644 --- a/internal/query/query_response.go +++ b/internal/protocol/query_response.go @@ -1,6 +1,6 @@ -// Copyright 2022 Tatris Project Authors. Licensed under Apache-2.0. +// Copyright 2023 Tatris Project Authors. Licensed under Apache-2.0. -package query +package protocol type QueryResponse struct { Took int64 `json:"took"` diff --git a/internal/meta/shard.go b/internal/protocol/shard.go similarity index 96% rename from internal/meta/shard.go rename to internal/protocol/shard.go index 628b740..5904f45 100644 --- a/internal/meta/shard.go +++ b/internal/protocol/shard.go @@ -1,6 +1,6 @@ // Copyright 2022 Tatris Project Authors. Licensed under Apache-2.0. -package meta +package protocol // Shard is a logical split of the index type Shard struct { diff --git a/internal/query/handler/query_handler.go b/internal/query/handler/query_handler.go new file mode 100644 index 0000000..8c540fe --- /dev/null +++ b/internal/query/handler/query_handler.go @@ -0,0 +1,30 @@ +// Copyright 2022 Tatris Project Authors. Licensed under Apache-2.0. + +// Package handler is responsible for handling HTTP requests about query +package handler + +import ( + "github.com/gin-gonic/gin" + "github.com/tatris-io/tatris/internal/protocol" + "github.com/tatris-io/tatris/internal/query" + "net/http" +) + +func QueryHandler(c *gin.Context) { + indexName := c.Param("index") + queryRequest := protocol.QueryRequest{Size: 10} + if err := c.ShouldBind(&queryRequest); err != nil { + c.String(http.StatusBadRequest, `invalid request`) + } + queryRequest.Index = indexName + hits, err := query.SearchDocs(queryRequest) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"msg": err.Error()}) + } else { + result := protocol.QueryResponse{} + result.Took = 0 + result.Hits = *hits + c.JSON(http.StatusOK, result) + } + +} diff --git a/internal/query/query_handler_test.go b/internal/query/handler/query_handler_test.go similarity index 90% rename from internal/query/query_handler_test.go rename to internal/query/handler/query_handler_test.go index a83fe05..dd5de1c 100644 --- a/internal/query/query_handler_test.go +++ b/internal/query/handler/query_handler_test.go @@ -1,6 +1,6 @@ // Copyright 2022 Tatris Project Authors. Licensed under Apache-2.0. -package query +package handler import ( "bytes" @@ -26,7 +26,7 @@ func TestQueryHandler(t *testing.T) { } c.Request = req p := gin.Params{} - p = append(p, gin.Param{Key: "index", Value: "index_1"}) + p = append(p, gin.Param{Key: "index", Value: "storage_product"}) c.Params = p c.Request.Header.Set("Content-Type", "application/json;charset=utf-8") c.Request.Body = io.NopCloser(bytes.NewBufferString("{\"query\":{\"match_all\":{}},\"size\":20}")) diff --git a/internal/query/query_doc.go b/internal/query/query_doc.go new file mode 100644 index 0000000..1f28c63 --- /dev/null +++ b/internal/query/query_doc.go @@ -0,0 +1,66 @@ +// Copyright 2023 Tatris Project Authors. Licensed under Apache-2.0. + +package query + +import ( + "context" + "errors" + "github.com/tatris-io/tatris/internal/indexlib" + "github.com/tatris-io/tatris/internal/indexlib/manage" + "github.com/tatris-io/tatris/internal/protocol" + "os" +) + +var wd, _ = os.Getwd() + +// TODO: make it configurable +var dataPath = wd + "/../../../_data" + +func SearchDocs(request protocol.QueryRequest) (*protocol.Hits, error) { + config := &indexlib.BaseConfig{ + Index: request.Index, + DataPath: dataPath, + } + reader, err := manage.GetReader(config) + if err != nil { + return nil, err + } + libRequest, err := transform(request) + if err != nil { + return nil, err + } + resp, err := reader.Search(context.Background(), libRequest, -1) + if err != nil { + return nil, err + } + respHits := resp.Hits + hits := &protocol.Hits{ + Total: protocol.Total{Value: respHits.Total.Value, Relation: respHits.Total.Relation}, + } + hits.Hits = make([]protocol.Hit, len(respHits.Hits)) + for i, respHit := range respHits.Hits { + hits.Hits[i] = protocol.Hit{Index: respHit.Index, ID: respHit.ID, Source: respHit.Source} + } + return hits, nil +} + +func transform(request protocol.QueryRequest) (indexlib.QueryRequest, error) { + query := request.Query + if query.MatchAll != nil { + return &indexlib.MatchAllQuery{}, nil + } else if query.Match != nil { + matches := *query.Match + if len(matches) != 1 { + return nil, errors.New("invalid match query") + } + matchQ := indexlib.MatchQuery{} + for k, v := range matches { + matchQ.Field = k + matchQ.Match = v.(string) + } + return &matchQ, nil + } else { + // TODO: need to be supported + return nil, errors.New("need to be supported") + } +} diff --git a/internal/query/query_handler.go b/internal/query/query_handler.go deleted file mode 100644 index 02c224c..0000000 --- a/internal/query/query_handler.go +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright 2022 Tatris Project Authors. Licensed under Apache-2.0. - -package query - -import ( - "github.com/gin-gonic/gin" - "net/http" -) - -func QueryHandler(c *gin.Context) { - indexName := c.Param("index") - queryRequest := QueryRequest{Size: 10} - if err := c.ShouldBind(&queryRequest); err != nil { - c.String(http.StatusBadRequest, `invalid request`) - } - queryRequest.Index = indexName - // TODO do search... - c.JSON(http.StatusOK, queryRequest) -} diff --git a/internal/service/router.go b/internal/service/router.go index dfb163f..aff3624 100644 --- a/internal/service/router.go +++ b/internal/service/router.go @@ -4,9 +4,9 @@ package service import ( "github.com/gin-gonic/gin" - "github.com/tatris-io/tatris/internal/ingestion" - "github.com/tatris-io/tatris/internal/meta/handler" - "github.com/tatris-io/tatris/internal/query" + handler1 "github.com/tatris-io/tatris/internal/ingestion/handler" + handler2 "github.com/tatris-io/tatris/internal/meta/handler" + handler3 "github.com/tatris-io/tatris/internal/query/handler" ) func StartHTTPServer(roles ...string) { @@ -49,14 +49,14 @@ func StartHTTPServer(roles ...string) { } func registerIngestion(group *gin.RouterGroup) { - group.PUT("/:index/_ingest", ingestion.IngestHandler) + group.PUT("/:index/_ingest", handler1.IngestHandler) } func registerQuery(group *gin.RouterGroup) { - group.POST("/:index/_search", query.QueryHandler) + group.POST("/:index/_search", handler3.QueryHandler) } func registerMeta(group *gin.RouterGroup) { - group.PUT("/:index", handler.CreateIndexHandler) - group.GET("/:index", handler.GetIndexHandler) + group.PUT("/:index", handler2.CreateIndexHandler) + group.GET("/:index", handler2.GetIndexHandler) } From a2d5d4709b9824c31fde0148fa7b7ee964bb51bd Mon Sep 17 00:00:00 2001 From: xiangwanpeng Date: Thu, 5 Jan 2023 19:04:14 +0800 Subject: [PATCH 2/6] feat: record query costs --- internal/protocol/query_response.go | 2 +- internal/query/handler/query_handler.go | 11 ++++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/internal/protocol/query_response.go b/internal/protocol/query_response.go index 5a79a59..c398709 100644 --- a/internal/protocol/query_response.go +++ b/internal/protocol/query_response.go @@ -3,7 +3,7 @@ package protocol type QueryResponse struct { - Took int64 `json:"took"` + Took int64 `json:"took"` // unit: ms TimedOut bool `json:"timedOut"` Shards Shards `json:"_shards"` Hits Hits `json:"hits"` diff --git a/internal/query/handler/query_handler.go b/internal/query/handler/query_handler.go index 8c540fe..4288067 100644 --- a/internal/query/handler/query_handler.go +++ b/internal/query/handler/query_handler.go @@ -8,9 +8,11 @@ import ( "github.com/tatris-io/tatris/internal/protocol" "github.com/tatris-io/tatris/internal/query" "net/http" + "time" ) func QueryHandler(c *gin.Context) { + start := time.Now() indexName := c.Param("index") queryRequest := protocol.QueryRequest{Size: 10} if err := c.ShouldBind(&queryRequest); err != nil { @@ -21,10 +23,9 @@ func QueryHandler(c *gin.Context) { if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"msg": err.Error()}) } else { - result := protocol.QueryResponse{} - result.Took = 0 - result.Hits = *hits - c.JSON(http.StatusOK, result) + resp := protocol.QueryResponse{} + resp.Hits = *hits + resp.Took = time.Since(start).Milliseconds() + c.JSON(http.StatusOK, resp) } - } From e514c5ca4777eec028e774f10ab79fd20e1075b4 Mon Sep 17 00:00:00 2001 From: xiangwanpeng Date: Thu, 5 Jan 2023 20:47:52 +0800 Subject: [PATCH 3/6] fix: fix data path --- internal/ingestion/ingest_doc.go | 5 +---- internal/meta/metadata/storage/boltdb.go | 2 +- internal/query/query_doc.go | 5 +---- 3 files changed, 3 insertions(+), 9 deletions(-) diff --git a/internal/ingestion/ingest_doc.go b/internal/ingestion/ingest_doc.go index 6a80820..4e77f34 100644 --- a/internal/ingestion/ingest_doc.go +++ b/internal/ingestion/ingest_doc.go @@ -8,13 +8,10 @@ import ( "github.com/tatris-io/tatris/internal/indexlib" "github.com/tatris-io/tatris/internal/indexlib/manage" "log" - "os" ) -var wd, _ = os.Getwd() - // TODO: make it configurable -var dataPath = wd + "/../../../_data" +var dataPath = "/tmp/tatris/_data" func IngestDocs(idxName string, docs []map[string]interface{}) error { config := &indexlib.BaseConfig{ diff --git a/internal/meta/metadata/storage/boltdb.go b/internal/meta/metadata/storage/boltdb.go index 5749efd..cca8445 100644 --- a/internal/meta/metadata/storage/boltdb.go +++ b/internal/meta/metadata/storage/boltdb.go @@ -12,7 +12,7 @@ import ( ) const ( - BoltMetaPath = "./_meta.bolt" + BoltMetaPath = "/tmp/tatris/_meta.bolt" ) type BoltMetaStore struct { diff --git a/internal/query/query_doc.go b/internal/query/query_doc.go index 1f28c63..f89880b 100644 --- a/internal/query/query_doc.go +++ b/internal/query/query_doc.go @@ -8,13 +8,10 @@ import ( "github.com/tatris-io/tatris/internal/indexlib" "github.com/tatris-io/tatris/internal/indexlib/manage" "github.com/tatris-io/tatris/internal/protocol" - "os" ) -var wd, _ = os.Getwd() - // TODO: make it configurable -var dataPath = wd + "/../../../_data" +var dataPath = "/tmp/tatris/_data" func SearchDocs(request protocol.QueryRequest) (*protocol.Hits, error) { config := &indexlib.BaseConfig{ From fdc757f2ef69255659d1a56108ca0de59e30ea7c Mon Sep 17 00:00:00 2001 From: xiangwanpeng Date: Fri, 6 Jan 2023 10:44:49 +0800 Subject: [PATCH 4/6] test: test ci --- .github/workflows/check.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/check.yml b/.github/workflows/check.yml index eff90d0..59bb604 100644 --- a/.github/workflows/check.yml +++ b/.github/workflows/check.yml @@ -4,6 +4,7 @@ on: push: branches: - "main" + - "test/test-ci" pull_request: branches: - "main" From e30ef79950c339b026517a230572b8b6dd3bde71 Mon Sep 17 00:00:00 2001 From: xiangwanpeng Date: Fri, 6 Jan 2023 13:33:44 +0800 Subject: [PATCH 5/6] fix(meta): replace boltdb/bolt with etcd-io/bbolt --- go.mod | 2 +- go.sum | 3 +++ internal/meta/metadata/index.go | 3 ++- .../metadata/storage/{ => boltdb}/boltdb.go | 24 +++++++++---------- .../storage/{ => boltdb}/boltdb_test.go | 2 +- internal/meta/metadata/storage/metastore.go | 2 ++ 6 files changed, 21 insertions(+), 15 deletions(-) rename internal/meta/metadata/storage/{ => boltdb}/boltdb.go (69%) rename internal/meta/metadata/storage/{ => boltdb}/boltdb_test.go (97%) diff --git a/go.mod b/go.mod index 4a3e978..4d1ff60 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,6 @@ require ( github.com/axw/gocov v1.1.0 github.com/blugelabs/bluge v0.2.2 github.com/blugelabs/bluge_segment_api v0.2.0 - github.com/boltdb/bolt v1.3.1 github.com/gin-gonic/gin v1.8.2 github.com/mgechev/revive v1.2.4 github.com/stretchr/testify v1.8.1 @@ -58,6 +57,7 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/ugorji/go/codec v1.2.7 // indirect + go.etcd.io/bbolt v1.3.6 // indirect golang.org/x/crypto v0.4.0 // indirect golang.org/x/mod v0.7.0 // indirect golang.org/x/net v0.4.0 // indirect diff --git a/go.sum b/go.sum index 9623cf3..6650761 100644 --- a/go.sum +++ b/go.sum @@ -173,6 +173,8 @@ github.com/ugorji/go/codec v1.2.7/go.mod h1:WGN1fab3R1fzQlVQTkfxVtIBhWDRqOviHU95 github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU= +go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -211,6 +213,7 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200923182605-d9f96fdee20d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/internal/meta/metadata/index.go b/internal/meta/metadata/index.go index da50f62..50341fa 100644 --- a/internal/meta/metadata/index.go +++ b/internal/meta/metadata/index.go @@ -6,13 +6,14 @@ package metadata import ( json "encoding/json" "github.com/tatris-io/tatris/internal/meta/metadata/storage" + "github.com/tatris-io/tatris/internal/meta/metadata/storage/boltdb" "github.com/tatris-io/tatris/internal/protocol" ) var metaStore storage.MetaStore func init() { - metaStore, _ = storage.Open() + metaStore, _ = boltdb.Open() } func Create(idx *protocol.Index) error { diff --git a/internal/meta/metadata/storage/boltdb.go b/internal/meta/metadata/storage/boltdb/boltdb.go similarity index 69% rename from internal/meta/metadata/storage/boltdb.go rename to internal/meta/metadata/storage/boltdb/boltdb.go index cca8445..0f3a9ae 100644 --- a/internal/meta/metadata/storage/boltdb.go +++ b/internal/meta/metadata/storage/boltdb/boltdb.go @@ -1,14 +1,14 @@ // Copyright 2022 Tatris Project Authors. Licensed under Apache-2.0. -// Package storage is about how to implement persistent storage of metadata -package storage +// Package boltdb describes an implementation of boltdb-based metadata storage +package boltdb import ( "bytes" "errors" + "github.com/tatris-io/tatris/internal/meta/metadata/storage" + "go.etcd.io/bbolt" "time" - - "github.com/boltdb/bolt" ) const ( @@ -16,27 +16,27 @@ const ( ) type BoltMetaStore struct { - db *bolt.DB + db *bbolt.DB } -func Open() (*BoltMetaStore, error) { - // Open the data file in your current directory. +func Open() (storage.MetaStore, error) { + // Open the data file. // It will be created if it doesn't exist. - db, err := bolt.Open(BoltMetaPath, 0600, &bolt.Options{Timeout: 1 * time.Second}) + db, err := bbolt.Open(BoltMetaPath, 0600, &bbolt.Options{Timeout: 1 * time.Second}) if err != nil { return nil, err } return &BoltMetaStore{db}, nil } -func Close(store *BoltMetaStore) error { +func (store *BoltMetaStore) Close() error { return store.db.Close() } func (store *BoltMetaStore) Get(path string) ([]byte, error) { var result []byte bkt, key := splitPath(path) - err := store.db.View(func(tx *bolt.Tx) error { + err := store.db.View(func(tx *bbolt.Tx) error { bucket := tx.Bucket(bkt) if bucket == nil { return errors.New("bucket not found: " + string(bkt)) @@ -53,7 +53,7 @@ func (store *BoltMetaStore) Get(path string) ([]byte, error) { func (store *BoltMetaStore) Set(path string, val []byte) error { bkt, key := splitPath(path) - return store.db.Update(func(tx *bolt.Tx) error { + return store.db.Update(func(tx *bbolt.Tx) error { bucket, err := tx.CreateBucketIfNotExists(bkt) if err != nil { return err @@ -64,7 +64,7 @@ func (store *BoltMetaStore) Set(path string, val []byte) error { func (store *BoltMetaStore) Delete(path string) error { bkt, key := splitPath(path) - return store.db.Update(func(tx *bolt.Tx) error { + return store.db.Update(func(tx *bbolt.Tx) error { bucket := tx.Bucket(bkt) if bucket != nil { return bucket.Delete(key) diff --git a/internal/meta/metadata/storage/boltdb_test.go b/internal/meta/metadata/storage/boltdb/boltdb_test.go similarity index 97% rename from internal/meta/metadata/storage/boltdb_test.go rename to internal/meta/metadata/storage/boltdb/boltdb_test.go index 27640f9..7b51427 100644 --- a/internal/meta/metadata/storage/boltdb_test.go +++ b/internal/meta/metadata/storage/boltdb/boltdb_test.go @@ -1,6 +1,6 @@ // Copyright 2022 Tatris Project Authors. Licensed under Apache-2.0. -package storage +package boltdb import ( "github.com/stretchr/testify/assert" diff --git a/internal/meta/metadata/storage/metastore.go b/internal/meta/metadata/storage/metastore.go index 0388731..ab4a501 100644 --- a/internal/meta/metadata/storage/metastore.go +++ b/internal/meta/metadata/storage/metastore.go @@ -1,9 +1,11 @@ // Copyright 2022 Tatris Project Authors. Licensed under Apache-2.0. +// Package storage is about the physical storage of metadata package storage type MetaStore interface { Set(string, []byte) error Get(string) ([]byte, error) Delete(string) error + Close() error } From 47ffeeeb7283bb95454b4720d0594501837294f4 Mon Sep 17 00:00:00 2001 From: xiangwanpeng Date: Fri, 6 Jan 2023 13:55:25 +0800 Subject: [PATCH 6/6] fix: revert check.yml --- .github/workflows/check.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/check.yml b/.github/workflows/check.yml index 59bb604..eff90d0 100644 --- a/.github/workflows/check.yml +++ b/.github/workflows/check.yml @@ -4,7 +4,6 @@ on: push: branches: - "main" - - "test/test-ci" pull_request: branches: - "main"