From 47a08131748bc895416c5686b5315782b45e9914 Mon Sep 17 00:00:00 2001 From: Derek Perkins Date: Wed, 25 Mar 2020 00:33:27 -0600 Subject: [PATCH 1/4] messaging: add new priority field Signed-off-by: Derek Perkins --- go/test/endtoend/messaging/main_test.go | 6 ++-- go/test/endtoend/messaging/message_test.go | 6 ++-- go/vt/vttablet/tabletserver/messager/cache.go | 5 +-- .../tabletserver/messager/message_manager.go | 31 ++++++++++++------- .../messager/message_manager_test.go | 1 + .../tabletserver/query_executor_test.go | 3 ++ .../tabletserver/schema/engine_test.go | 3 ++ .../tabletserver/schema/load_table.go | 2 ++ .../tabletserver/schema/load_table_test.go | 8 +++-- .../schema/schematest/schematest.go | 3 ++ 10 files changed, 48 insertions(+), 20 deletions(-) diff --git a/go/test/endtoend/messaging/main_test.go b/go/test/endtoend/messaging/main_test.go index 0281a2ac129..36d2bf97517 100644 --- a/go/test/endtoend/messaging/main_test.go +++ b/go/test/endtoend/messaging/main_test.go @@ -42,22 +42,24 @@ var ( lookupKeyspace = "lookup" createShardedMessage = `create table sharded_message( id bigint, + priority bigint default 0, time_next bigint default 0, epoch bigint, time_acked bigint, message varchar(128), primary key(id), - index next_idx(time_next, epoch), + index next_idx(priority, time_next), index ack_idx(time_acked) ) comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'` createUnshardedMessage = `create table unsharded_message( id bigint, + priority bigint default 0, time_next bigint default 0, epoch bigint, time_acked bigint, message varchar(128), primary key(id), - index next_idx(time_next, epoch), + index next_idx(priority, time_next), index ack_idx(time_acked) ) comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'` userVschema = `{ diff --git a/go/test/endtoend/messaging/message_test.go b/go/test/endtoend/messaging/message_test.go index fe3851afeea..72312594067 100644 --- a/go/test/endtoend/messaging/message_test.go +++ b/go/test/endtoend/messaging/message_test.go @@ -31,12 +31,13 @@ import ( var createMessage = `create table vitess_message( id bigint, + priority bigint default 0, time_next bigint default 0, epoch bigint, time_acked bigint, message varchar(128), primary key(id), - index next_idx(time_next, epoch), + index next_idx(priority, time_next), index ack_idx(time_acked)) comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'` @@ -141,13 +142,14 @@ func TestMessage(t *testing.T) { var createThreeColMessage = `create table vitess_message3( id bigint, + priority bigint default 0, time_next bigint default 0, epoch bigint, time_acked bigint, msg1 varchar(128), msg2 bigint, primary key(id), - index next_idx(time_next, epoch), + index next_idx(priority, time_next), index ack_idx(time_acked)) comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'` diff --git a/go/vt/vttablet/tabletserver/messager/cache.go b/go/vt/vttablet/tabletserver/messager/cache.go index f2c1653c30e..0de059b7d32 100644 --- a/go/vt/vttablet/tabletserver/messager/cache.go +++ b/go/vt/vttablet/tabletserver/messager/cache.go @@ -28,6 +28,7 @@ import ( // MessageRow represents a message row. // The first column in Row is always the "id". type MessageRow struct { + Priority int64 TimeNext int64 Epoch int64 TimeAcked int64 @@ -47,8 +48,8 @@ func (mh messageHeap) Len() int { func (mh messageHeap) Less(i, j int) bool { // Lower epoch is more important. // If epochs match, newer messages are more important. - return mh[i].Epoch < mh[j].Epoch || - (mh[i].Epoch == mh[j].Epoch && mh[i].TimeNext > mh[j].TimeNext) + return mh[i].Priority < mh[j].Priority || + (mh[i].Priority == mh[j].Priority && mh[i].TimeNext > mh[j].TimeNext) } func (mh messageHeap) Swap(i, j int) { diff --git a/go/vt/vttablet/tabletserver/messager/message_manager.go b/go/vt/vttablet/tabletserver/messager/message_manager.go index 51943d3bbdf..f418d8557e9 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager.go @@ -203,11 +203,11 @@ type messageManager struct { // The goroutine must in turn defer on Done. wg sync.WaitGroup - vsFilter *binlogdatapb.Filter - readByTimeNext *sqlparser.ParsedQuery - ackQuery *sqlparser.ParsedQuery - postponeQuery *sqlparser.ParsedQuery - purgeQuery *sqlparser.ParsedQuery + vsFilter *binlogdatapb.Filter + readByPriorityAndTimeNext *sqlparser.ParsedQuery + ackQuery *sqlparser.ParsedQuery + postponeQuery *sqlparser.ParsedQuery + purgeQuery *sqlparser.ParsedQuery } // newMessageManager creates a new message manager. @@ -233,15 +233,15 @@ func newMessageManager(tsv TabletService, vs VStreamer, table *schema.Table, pos mm.cond.L = &mm.mu columnList := buildSelectColumnList(table) - vsQuery := fmt.Sprintf("select time_next, epoch, time_acked, %s from %v", columnList, mm.name) + vsQuery := fmt.Sprintf("select priority, time_next, epoch, time_acked, %s from %v", columnList, mm.name) mm.vsFilter = &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ Match: table.Name.String(), Filter: vsQuery, }}, } - mm.readByTimeNext = sqlparser.BuildParsedQuery( - "select time_next, epoch, time_acked, %s from %v where time_next < %a order by time_next desc limit %a", + mm.readByPriorityAndTimeNext = sqlparser.BuildParsedQuery( + "select priority, time_next, epoch, time_acked, %s from %v where time_next < %a order by priority, time_next desc limit %a", columnList, mm.name, ":time_next", ":max") mm.ackQuery = sqlparser.BuildParsedQuery( "update %v set time_acked = %a, time_next = null where id in %a and time_acked is null", @@ -808,22 +808,29 @@ func (mm *messageManager) GeneratePurgeQuery(timeCutoff int64) (string, map[stri // BuildMessageRow builds a MessageRow for a db row. func BuildMessageRow(row []sqltypes.Value) (*MessageRow, error) { - mr := &MessageRow{Row: row[3:]} + mr := &MessageRow{Row: row[4:]} if !row[0].IsNull() { v, err := sqltypes.ToInt64(row[0]) if err != nil { return nil, err } - mr.TimeNext = v + mr.Priority = v } if !row[1].IsNull() { + v, err := sqltypes.ToInt64(row[0]) + if err != nil { + return nil, err + } + mr.TimeNext = v + } + if !row[2].IsNull() { v, err := sqltypes.ToInt64(row[1]) if err != nil { return nil, err } mr.Epoch = v } - if !row[2].IsNull() { + if !row[3].IsNull() { v, err := sqltypes.ToInt64(row[2]) if err != nil { return nil, err @@ -840,7 +847,7 @@ func (mm *messageManager) receiverCount() int { } func (mm *messageManager) readPending(ctx context.Context, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) { - query, err := mm.readByTimeNext.GenerateQuery(bindVars, nil) + query, err := mm.readByPriorityAndTimeNext.GenerateQuery(bindVars, nil) if err != nil { tabletenv.InternalErrors.Add("Messages", 1) log.Errorf("Error reading rows from message table: %v", err) diff --git a/go/vt/vttablet/tabletserver/messager/message_manager_test.go b/go/vt/vttablet/tabletserver/messager/message_manager_test.go index 51f29bfe967..92ca5bf147b 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager_test.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager_test.go @@ -73,6 +73,7 @@ func newMMTable() *schema.Table { func newMMRow(id int64) *querypb.Row { return sqltypes.RowToProto3([]sqltypes.Value{ + sqltypes.NewInt64(1), sqltypes.NewInt64(1), sqltypes.NewInt64(0), sqltypes.NULL, diff --git a/go/vt/vttablet/tabletserver/query_executor_test.go b/go/vt/vttablet/tabletserver/query_executor_test.go index b4ae36ad2f5..15cda647e35 100644 --- a/go/vt/vttablet/tabletserver/query_executor_test.go +++ b/go/vt/vttablet/tabletserver/query_executor_test.go @@ -1234,6 +1234,9 @@ func getQueryExecutorSupportedQueries(testTableHasMultipleUniqueKeys bool) map[s Fields: []*querypb.Field{{ Name: "id", Type: sqltypes.Int64, + }, { + Name: "priority", + Type: sqltypes.Int64, }, { Name: "time_next", Type: sqltypes.Int64, diff --git a/go/vt/vttablet/tabletserver/schema/engine_test.go b/go/vt/vttablet/tabletserver/schema/engine_test.go index 75efd20e3ca..2af7a9dc2a3 100644 --- a/go/vt/vttablet/tabletserver/schema/engine_test.go +++ b/go/vt/vttablet/tabletserver/schema/engine_test.go @@ -375,6 +375,9 @@ func initialSchema() map[string]*Table { Fields: []*querypb.Field{{ Name: "id", Type: sqltypes.Int64, + }, { + Name: "priority", + Type: sqltypes.Int64, }, { Name: "time_next", Type: sqltypes.Int64, diff --git a/go/vt/vttablet/tabletserver/schema/load_table.go b/go/vt/vttablet/tabletserver/schema/load_table.go index 45466d00a76..e46868e115c 100644 --- a/go/vt/vttablet/tabletserver/schema/load_table.go +++ b/go/vt/vttablet/tabletserver/schema/load_table.go @@ -58,6 +58,7 @@ func fetchColumns(ta *Table, conn *connpool.DBConn, sqlTableName string) error { func loadMessageInfo(ta *Table, comment string) error { hiddenCols := map[string]struct{}{ + "priority": {}, "time_next": {}, "epoch": {}, "time_acked": {}, @@ -65,6 +66,7 @@ func loadMessageInfo(ta *Table, comment string) error { requiredCols := []string{ "id", + "priority", "time_next", "epoch", "time_acked", diff --git a/go/vt/vttablet/tabletserver/schema/load_table_test.go b/go/vt/vttablet/tabletserver/schema/load_table_test.go index aae90320fac..02c9af95e18 100644 --- a/go/vt/vttablet/tabletserver/schema/load_table_test.go +++ b/go/vt/vttablet/tabletserver/schema/load_table_test.go @@ -97,6 +97,9 @@ func TestLoadTableMessage(t *testing.T) { Fields: []*querypb.Field{{ Name: "id", Type: sqltypes.Int64, + }, { + Name: "priority", + Type: sqltypes.Int64, }, { Name: "time_next", Type: sqltypes.Int64, @@ -178,13 +181,14 @@ func getTestLoadTableQueries() map[string]*sqltypes.Result { } func getMessageTableQueries() map[string]*sqltypes.Result { - // id is intentionally after the message column to ensure that the - // loader still makes it the first one. return map[string]*sqltypes.Result{ "select * from test_table where 1 != 1": { Fields: []*querypb.Field{{ Name: "id", Type: sqltypes.Int64, + }, { + Name: "priority", + Type: sqltypes.Int64, }, { Name: "time_next", Type: sqltypes.Int64, diff --git a/go/vt/vttablet/tabletserver/schema/schematest/schematest.go b/go/vt/vttablet/tabletserver/schema/schematest/schematest.go index d0458e25eee..64395121714 100644 --- a/go/vt/vttablet/tabletserver/schema/schematest/schematest.go +++ b/go/vt/vttablet/tabletserver/schema/schematest/schematest.go @@ -118,6 +118,9 @@ func Queries() map[string]*sqltypes.Result { Fields: []*querypb.Field{{ Name: "id", Type: sqltypes.Int64, + }, { + Name: "priority", + Type: sqltypes.Int64, }, { Name: "time_next", Type: sqltypes.Int64, From 64485a31b388cbbdf3abc533e281dae8828906c5 Mon Sep 17 00:00:00 2001 From: Derek Perkins Date: Wed, 25 Mar 2020 01:10:42 -0600 Subject: [PATCH 2/4] messaging: fix tests Signed-off-by: Derek Perkins --- go/vt/vttablet/tabletserver/messager/cache_test.go | 5 +++++ go/vt/vttablet/tabletserver/messager/message_manager_test.go | 1 + 2 files changed, 6 insertions(+) diff --git a/go/vt/vttablet/tabletserver/messager/cache_test.go b/go/vt/vttablet/tabletserver/messager/cache_test.go index 41277d80f23..a4dd23ab732 100644 --- a/go/vt/vttablet/tabletserver/messager/cache_test.go +++ b/go/vt/vttablet/tabletserver/messager/cache_test.go @@ -26,6 +26,7 @@ import ( func TestMessagerCacheOrder(t *testing.T) { mc := newCache(10) if !mc.Add(&MessageRow{ + Priority: 1, TimeNext: 1, Epoch: 0, Row: []sqltypes.Value{sqltypes.NewVarBinary("row01")}, @@ -33,6 +34,7 @@ func TestMessagerCacheOrder(t *testing.T) { t.Fatal("Add returned false") } if !mc.Add(&MessageRow{ + Priority: 1, TimeNext: 2, Epoch: 0, Row: []sqltypes.Value{sqltypes.NewVarBinary("row02")}, @@ -40,6 +42,7 @@ func TestMessagerCacheOrder(t *testing.T) { t.Fatal("Add returned false") } if !mc.Add(&MessageRow{ + Priority: 2, TimeNext: 2, Epoch: 1, Row: []sqltypes.Value{sqltypes.NewVarBinary("row12")}, @@ -47,6 +50,7 @@ func TestMessagerCacheOrder(t *testing.T) { t.Fatal("Add returned false") } if !mc.Add(&MessageRow{ + Priority: 2, TimeNext: 1, Epoch: 1, Row: []sqltypes.Value{sqltypes.NewVarBinary("row11")}, @@ -54,6 +58,7 @@ func TestMessagerCacheOrder(t *testing.T) { t.Fatal("Add returned false") } if !mc.Add(&MessageRow{ + Priority: 1, TimeNext: 3, Epoch: 0, Row: []sqltypes.Value{sqltypes.NewVarBinary("row03")}, diff --git a/go/vt/vttablet/tabletserver/messager/message_manager_test.go b/go/vt/vttablet/tabletserver/messager/message_manager_test.go index 92ca5bf147b..1cee1b698f5 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager_test.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager_test.go @@ -52,6 +52,7 @@ var ( {Type: sqltypes.Int64}, {Type: sqltypes.Int64}, {Type: sqltypes.Int64}, + {Type: sqltypes.Int64}, {Type: sqltypes.VarBinary}, } ) From 5753a96734acbd68dbfe1023bc4c543bcb5c15f6 Mon Sep 17 00:00:00 2001 From: Derek Perkins Date: Wed, 25 Mar 2020 08:11:03 -0600 Subject: [PATCH 3/4] messaging: more test fixes Signed-off-by: Derek Perkins --- .../tabletserver/planbuilder/testdata/schema_test.json | 7 ++----- go/vt/vttablet/tabletserver/tabletserver_flaky_test.go | 3 +++ 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/go/vt/vttablet/tabletserver/planbuilder/testdata/schema_test.json b/go/vt/vttablet/tabletserver/planbuilder/testdata/schema_test.json index 1a23e93eea4..00243d4b08b 100644 --- a/go/vt/vttablet/tabletserver/planbuilder/testdata/schema_test.json +++ b/go/vt/vttablet/tabletserver/planbuilder/testdata/schema_test.json @@ -313,10 +313,10 @@ "Name": "msg", "Columns": [ { - "Name": "time_scheduled" + "Name": "id" }, { - "Name": "id" + "Name": "priority" }, { "Name": "time_next" @@ -324,9 +324,6 @@ { "Name": "epoch" }, - { - "Name": "time_created" - }, { "Name": "time_acked" }, diff --git a/go/vt/vttablet/tabletserver/tabletserver_flaky_test.go b/go/vt/vttablet/tabletserver/tabletserver_flaky_test.go index 68b7e34cfee..193684f4fe4 100644 --- a/go/vt/vttablet/tabletserver/tabletserver_flaky_test.go +++ b/go/vt/vttablet/tabletserver/tabletserver_flaky_test.go @@ -2591,6 +2591,9 @@ func getSupportedQueries() map[string]*sqltypes.Result { Fields: []*querypb.Field{{ Name: "id", Type: sqltypes.Int64, + }, { + Name: "priority", + Type: sqltypes.Int64, }, { Name: "time_next", Type: sqltypes.Int64, From 54405e5c6362dfaf72f286ab5418527550591421 Mon Sep 17 00:00:00 2001 From: Derek Perkins Date: Wed, 25 Mar 2020 08:55:08 -0600 Subject: [PATCH 4/4] messaging: desc time_next index + typo fix Signed-off-by: Derek Perkins --- go/test/endtoend/messaging/cluster_ops_test.go | 4 ++-- go/test/endtoend/messaging/main_test.go | 4 ++-- go/test/endtoend/messaging/message_test.go | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/go/test/endtoend/messaging/cluster_ops_test.go b/go/test/endtoend/messaging/cluster_ops_test.go index 4396a92d8c4..53558f5b4d2 100644 --- a/go/test/endtoend/messaging/cluster_ops_test.go +++ b/go/test/endtoend/messaging/cluster_ops_test.go @@ -45,8 +45,8 @@ func TestUnsharded(t *testing.T) { testMessaging(t, "unsharded_message", lookupKeyspace) } -// TestRepareting checks the client connection count after reparenting. -func TestRepareting(t *testing.T) { +// TestReparenting checks the client connection count after reparenting. +func TestReparenting(t *testing.T) { defer cluster.PanicHandler(t) name := "sharded_message" diff --git a/go/test/endtoend/messaging/main_test.go b/go/test/endtoend/messaging/main_test.go index 36d2bf97517..eb311fb39c8 100644 --- a/go/test/endtoend/messaging/main_test.go +++ b/go/test/endtoend/messaging/main_test.go @@ -48,7 +48,7 @@ var ( time_acked bigint, message varchar(128), primary key(id), - index next_idx(priority, time_next), + index next_idx(priority, time_next desc), index ack_idx(time_acked) ) comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'` createUnshardedMessage = `create table unsharded_message( @@ -59,7 +59,7 @@ var ( time_acked bigint, message varchar(128), primary key(id), - index next_idx(priority, time_next), + index next_idx(priority, time_next desc), index ack_idx(time_acked) ) comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'` userVschema = `{ diff --git a/go/test/endtoend/messaging/message_test.go b/go/test/endtoend/messaging/message_test.go index 72312594067..1183621dbf0 100644 --- a/go/test/endtoend/messaging/message_test.go +++ b/go/test/endtoend/messaging/message_test.go @@ -37,7 +37,7 @@ var createMessage = `create table vitess_message( time_acked bigint, message varchar(128), primary key(id), - index next_idx(priority, time_next), + index next_idx(priority, time_next desc), index ack_idx(time_acked)) comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'` @@ -149,7 +149,7 @@ var createThreeColMessage = `create table vitess_message3( msg1 varchar(128), msg2 bigint, primary key(id), - index next_idx(priority, time_next), + index next_idx(priority, time_next desc), index ack_idx(time_acked)) comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'`