From 0223eb01b92e30e330fc967a9335e2c46aaf9b59 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 31 May 2021 13:05:02 +0530 Subject: [PATCH 1/3] proto changes for prepare and closeSession vtgate api Signed-off-by: Harshit Gangal --- go/vt/proto/vtadmin/vtadmin.pb.go | 2 +- go/vt/proto/vtgate/vtgate.pb.go | 460 +++++++-- go/vt/proto/vtgate/vtgate_vtproto.pb.go | 870 +++++++++++++++++- go/vt/proto/vtgateservice/vtgateservice.pb.go | 70 +- .../vtgateservice/vtgateservice_grpc.pb.go | 80 ++ proto/vtgate.proto | 45 + proto/vtgateservice.proto | 8 + 7 files changed, 1438 insertions(+), 97 deletions(-) diff --git a/go/vt/proto/vtadmin/vtadmin.pb.go b/go/vt/proto/vtadmin/vtadmin.pb.go index 86ffb3cc8ce..2dc553ad6d4 100644 --- a/go/vt/proto/vtadmin/vtadmin.pb.go +++ b/go/vt/proto/vtadmin/vtadmin.pb.go @@ -1324,7 +1324,7 @@ type GetTabletRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - // Unique tablet alias of the standard form: "$cell-$uid" + // Unique (per cluster) tablet alias of the standard form: "$cell-$uid" Alias string `protobuf:"bytes,1,opt,name=alias,proto3" json:"alias,omitempty"` // ClusterIDs is an optional parameter to narrow the scope of the search, if // the caller knows which cluster the tablet may be in, or, to disamiguate if diff --git a/go/vt/proto/vtgate/vtgate.pb.go b/go/vt/proto/vtgate/vtgate.pb.go index 880f36c0e9e..0b1a1933150 100644 --- a/go/vt/proto/vtgate/vtgate.pb.go +++ b/go/vt/proto/vtgate/vtgate.pb.go @@ -1231,6 +1231,253 @@ func (x *VStreamResponse) GetEvents() []*binlogdata.VEvent { return nil } +// PrepareRequest is the payload to Prepare. +type PrepareRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // caller_id identifies the caller. This is the effective caller ID, + // set by the application to further identify the caller. + CallerId *vtrpc.CallerID `protobuf:"bytes,1,opt,name=caller_id,json=callerId,proto3" json:"caller_id,omitempty"` + // session carries the session state. + Session *Session `protobuf:"bytes,2,opt,name=session,proto3" json:"session,omitempty"` + // query is the query and bind variables to execute. + Query *query.BoundQuery `protobuf:"bytes,3,opt,name=query,proto3" json:"query,omitempty"` +} + +func (x *PrepareRequest) Reset() { + *x = PrepareRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_vtgate_proto_msgTypes[13] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PrepareRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PrepareRequest) ProtoMessage() {} + +func (x *PrepareRequest) ProtoReflect() protoreflect.Message { + mi := &file_vtgate_proto_msgTypes[13] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PrepareRequest.ProtoReflect.Descriptor instead. +func (*PrepareRequest) Descriptor() ([]byte, []int) { + return file_vtgate_proto_rawDescGZIP(), []int{13} +} + +func (x *PrepareRequest) GetCallerId() *vtrpc.CallerID { + if x != nil { + return x.CallerId + } + return nil +} + +func (x *PrepareRequest) GetSession() *Session { + if x != nil { + return x.Session + } + return nil +} + +func (x *PrepareRequest) GetQuery() *query.BoundQuery { + if x != nil { + return x.Query + } + return nil +} + +// PrepareResponse is the returned value from Prepare. +type PrepareResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // error contains an application level error if necessary. Note the + // session may have changed, even when an error is returned (for + // instance if a database integrity error happened). + Error *vtrpc.RPCError `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"` + // session is the updated session information. + Session *Session `protobuf:"bytes,2,opt,name=session,proto3" json:"session,omitempty"` + // fields contains the fields, only set if error is unset. + Fields []*query.Field `protobuf:"bytes,3,rep,name=fields,proto3" json:"fields,omitempty"` +} + +func (x *PrepareResponse) Reset() { + *x = PrepareResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_vtgate_proto_msgTypes[14] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PrepareResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PrepareResponse) ProtoMessage() {} + +func (x *PrepareResponse) ProtoReflect() protoreflect.Message { + mi := &file_vtgate_proto_msgTypes[14] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PrepareResponse.ProtoReflect.Descriptor instead. +func (*PrepareResponse) Descriptor() ([]byte, []int) { + return file_vtgate_proto_rawDescGZIP(), []int{14} +} + +func (x *PrepareResponse) GetError() *vtrpc.RPCError { + if x != nil { + return x.Error + } + return nil +} + +func (x *PrepareResponse) GetSession() *Session { + if x != nil { + return x.Session + } + return nil +} + +func (x *PrepareResponse) GetFields() []*query.Field { + if x != nil { + return x.Fields + } + return nil +} + +// CloseSessionRequest is the payload to CloseSession. +type CloseSessionRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // caller_id identifies the caller. This is the effective caller ID, + // set by the application to further identify the caller. + CallerId *vtrpc.CallerID `protobuf:"bytes,1,opt,name=caller_id,json=callerId,proto3" json:"caller_id,omitempty"` + // session carries the session state. + Session *Session `protobuf:"bytes,2,opt,name=session,proto3" json:"session,omitempty"` +} + +func (x *CloseSessionRequest) Reset() { + *x = CloseSessionRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_vtgate_proto_msgTypes[15] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CloseSessionRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CloseSessionRequest) ProtoMessage() {} + +func (x *CloseSessionRequest) ProtoReflect() protoreflect.Message { + mi := &file_vtgate_proto_msgTypes[15] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CloseSessionRequest.ProtoReflect.Descriptor instead. +func (*CloseSessionRequest) Descriptor() ([]byte, []int) { + return file_vtgate_proto_rawDescGZIP(), []int{15} +} + +func (x *CloseSessionRequest) GetCallerId() *vtrpc.CallerID { + if x != nil { + return x.CallerId + } + return nil +} + +func (x *CloseSessionRequest) GetSession() *Session { + if x != nil { + return x.Session + } + return nil +} + +// CloseSessionResponse is the returned value from CloseSession. +type CloseSessionResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // error contains an application level error if necessary. Note the + // session may have changed, even when an error is returned (for + // instance if a database integrity error happened). + Error *vtrpc.RPCError `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"` +} + +func (x *CloseSessionResponse) Reset() { + *x = CloseSessionResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_vtgate_proto_msgTypes[16] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CloseSessionResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CloseSessionResponse) ProtoMessage() {} + +func (x *CloseSessionResponse) ProtoReflect() protoreflect.Message { + mi := &file_vtgate_proto_msgTypes[16] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CloseSessionResponse.ProtoReflect.Descriptor instead. +func (*CloseSessionResponse) Descriptor() ([]byte, []int) { + return file_vtgate_proto_rawDescGZIP(), []int{16} +} + +func (x *CloseSessionResponse) GetError() *vtrpc.RPCError { + if x != nil { + return x.Error + } + return nil +} + type Session_ShardSession struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -1246,7 +1493,7 @@ type Session_ShardSession struct { func (x *Session_ShardSession) Reset() { *x = Session_ShardSession{} if protoimpl.UnsafeEnabled { - mi := &file_vtgate_proto_msgTypes[13] + mi := &file_vtgate_proto_msgTypes[17] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1259,7 +1506,7 @@ func (x *Session_ShardSession) String() string { func (*Session_ShardSession) ProtoMessage() {} func (x *Session_ShardSession) ProtoReflect() protoreflect.Message { - mi := &file_vtgate_proto_msgTypes[13] + mi := &file_vtgate_proto_msgTypes[17] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1527,7 +1774,36 @@ var file_vtgate_proto_rawDesc = []byte{ 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2a, 0x0a, 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x56, 0x45, 0x76, - 0x65, 0x6e, 0x74, 0x52, 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x2a, 0x44, 0x0a, 0x0f, 0x54, + 0x65, 0x6e, 0x74, 0x52, 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x22, 0x92, 0x01, 0x0a, 0x0e, + 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2c, + 0x0a, 0x09, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x61, 0x6c, 0x6c, 0x65, 0x72, + 0x49, 0x44, 0x52, 0x08, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x64, 0x12, 0x29, 0x0a, 0x07, + 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, + 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x07, + 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x27, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, + 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x42, + 0x6f, 0x75, 0x6e, 0x64, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, + 0x22, 0x89, 0x01, 0x0a, 0x0f, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x50, 0x43, 0x45, + 0x72, 0x72, 0x6f, 0x72, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x29, 0x0a, 0x07, 0x73, + 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, + 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x07, 0x73, + 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x24, 0x0a, 0x06, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x73, + 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0c, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x46, + 0x69, 0x65, 0x6c, 0x64, 0x52, 0x06, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x22, 0x6e, 0x0a, 0x13, + 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x12, 0x2c, 0x0a, 0x09, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x5f, 0x69, 0x64, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x43, + 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x44, 0x52, 0x08, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, + 0x64, 0x12, 0x29, 0x0a, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x53, 0x65, 0x73, 0x73, + 0x69, 0x6f, 0x6e, 0x52, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x3d, 0x0a, 0x14, + 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x50, 0x43, 0x45, + 0x72, 0x72, 0x6f, 0x72, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x2a, 0x44, 0x0a, 0x0f, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x0f, 0x0a, 0x0b, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x53, 0x49, 0x4e, 0x47, 0x4c, 0x45, 0x10, 0x01, 0x12, 0x09, 0x0a, 0x05, 0x4d, @@ -1555,7 +1831,7 @@ func file_vtgate_proto_rawDescGZIP() []byte { } var file_vtgate_proto_enumTypes = make([]protoimpl.EnumInfo, 2) -var file_vtgate_proto_msgTypes = make([]protoimpl.MessageInfo, 16) +var file_vtgate_proto_msgTypes = make([]protoimpl.MessageInfo, 20) var file_vtgate_proto_goTypes = []interface{}{ (TransactionMode)(0), // 0: vtgate.TransactionMode (CommitOrder)(0), // 1: vtgate.CommitOrder @@ -1572,72 +1848,86 @@ var file_vtgate_proto_goTypes = []interface{}{ (*VStreamFlags)(nil), // 12: vtgate.VStreamFlags (*VStreamRequest)(nil), // 13: vtgate.VStreamRequest (*VStreamResponse)(nil), // 14: vtgate.VStreamResponse - (*Session_ShardSession)(nil), // 15: vtgate.Session.ShardSession - nil, // 16: vtgate.Session.UserDefinedVariablesEntry - nil, // 17: vtgate.Session.SystemVariablesEntry - (*query.ExecuteOptions)(nil), // 18: query.ExecuteOptions - (*query.QueryWarning)(nil), // 19: query.QueryWarning - (*vtrpc.CallerID)(nil), // 20: vtrpc.CallerID - (*query.BoundQuery)(nil), // 21: query.BoundQuery - (topodata.TabletType)(0), // 22: topodata.TabletType - (*vtrpc.RPCError)(nil), // 23: vtrpc.RPCError - (*query.QueryResult)(nil), // 24: query.QueryResult - (*query.ResultWithError)(nil), // 25: query.ResultWithError - (*binlogdata.VGtid)(nil), // 26: binlogdata.VGtid - (*binlogdata.Filter)(nil), // 27: binlogdata.Filter - (*binlogdata.VEvent)(nil), // 28: binlogdata.VEvent - (*query.Target)(nil), // 29: query.Target - (*topodata.TabletAlias)(nil), // 30: topodata.TabletAlias - (*query.BindVariable)(nil), // 31: query.BindVariable + (*PrepareRequest)(nil), // 15: vtgate.PrepareRequest + (*PrepareResponse)(nil), // 16: vtgate.PrepareResponse + (*CloseSessionRequest)(nil), // 17: vtgate.CloseSessionRequest + (*CloseSessionResponse)(nil), // 18: vtgate.CloseSessionResponse + (*Session_ShardSession)(nil), // 19: vtgate.Session.ShardSession + nil, // 20: vtgate.Session.UserDefinedVariablesEntry + nil, // 21: vtgate.Session.SystemVariablesEntry + (*query.ExecuteOptions)(nil), // 22: query.ExecuteOptions + (*query.QueryWarning)(nil), // 23: query.QueryWarning + (*vtrpc.CallerID)(nil), // 24: vtrpc.CallerID + (*query.BoundQuery)(nil), // 25: query.BoundQuery + (topodata.TabletType)(0), // 26: topodata.TabletType + (*vtrpc.RPCError)(nil), // 27: vtrpc.RPCError + (*query.QueryResult)(nil), // 28: query.QueryResult + (*query.ResultWithError)(nil), // 29: query.ResultWithError + (*binlogdata.VGtid)(nil), // 30: binlogdata.VGtid + (*binlogdata.Filter)(nil), // 31: binlogdata.Filter + (*binlogdata.VEvent)(nil), // 32: binlogdata.VEvent + (*query.Field)(nil), // 33: query.Field + (*query.Target)(nil), // 34: query.Target + (*topodata.TabletAlias)(nil), // 35: topodata.TabletAlias + (*query.BindVariable)(nil), // 36: query.BindVariable } var file_vtgate_proto_depIdxs = []int32{ - 15, // 0: vtgate.Session.shard_sessions:type_name -> vtgate.Session.ShardSession - 18, // 1: vtgate.Session.options:type_name -> query.ExecuteOptions + 19, // 0: vtgate.Session.shard_sessions:type_name -> vtgate.Session.ShardSession + 22, // 1: vtgate.Session.options:type_name -> query.ExecuteOptions 0, // 2: vtgate.Session.transaction_mode:type_name -> vtgate.TransactionMode - 19, // 3: vtgate.Session.warnings:type_name -> query.QueryWarning - 15, // 4: vtgate.Session.pre_sessions:type_name -> vtgate.Session.ShardSession - 15, // 5: vtgate.Session.post_sessions:type_name -> vtgate.Session.ShardSession - 16, // 6: vtgate.Session.user_defined_variables:type_name -> vtgate.Session.UserDefinedVariablesEntry - 17, // 7: vtgate.Session.system_variables:type_name -> vtgate.Session.SystemVariablesEntry - 15, // 8: vtgate.Session.lock_session:type_name -> vtgate.Session.ShardSession + 23, // 3: vtgate.Session.warnings:type_name -> query.QueryWarning + 19, // 4: vtgate.Session.pre_sessions:type_name -> vtgate.Session.ShardSession + 19, // 5: vtgate.Session.post_sessions:type_name -> vtgate.Session.ShardSession + 20, // 6: vtgate.Session.user_defined_variables:type_name -> vtgate.Session.UserDefinedVariablesEntry + 21, // 7: vtgate.Session.system_variables:type_name -> vtgate.Session.SystemVariablesEntry + 19, // 8: vtgate.Session.lock_session:type_name -> vtgate.Session.ShardSession 3, // 9: vtgate.Session.read_after_write:type_name -> vtgate.ReadAfterWrite - 20, // 10: vtgate.ExecuteRequest.caller_id:type_name -> vtrpc.CallerID + 24, // 10: vtgate.ExecuteRequest.caller_id:type_name -> vtrpc.CallerID 2, // 11: vtgate.ExecuteRequest.session:type_name -> vtgate.Session - 21, // 12: vtgate.ExecuteRequest.query:type_name -> query.BoundQuery - 22, // 13: vtgate.ExecuteRequest.tablet_type:type_name -> topodata.TabletType - 18, // 14: vtgate.ExecuteRequest.options:type_name -> query.ExecuteOptions - 23, // 15: vtgate.ExecuteResponse.error:type_name -> vtrpc.RPCError + 25, // 12: vtgate.ExecuteRequest.query:type_name -> query.BoundQuery + 26, // 13: vtgate.ExecuteRequest.tablet_type:type_name -> topodata.TabletType + 22, // 14: vtgate.ExecuteRequest.options:type_name -> query.ExecuteOptions + 27, // 15: vtgate.ExecuteResponse.error:type_name -> vtrpc.RPCError 2, // 16: vtgate.ExecuteResponse.session:type_name -> vtgate.Session - 24, // 17: vtgate.ExecuteResponse.result:type_name -> query.QueryResult - 20, // 18: vtgate.ExecuteBatchRequest.caller_id:type_name -> vtrpc.CallerID + 28, // 17: vtgate.ExecuteResponse.result:type_name -> query.QueryResult + 24, // 18: vtgate.ExecuteBatchRequest.caller_id:type_name -> vtrpc.CallerID 2, // 19: vtgate.ExecuteBatchRequest.session:type_name -> vtgate.Session - 21, // 20: vtgate.ExecuteBatchRequest.queries:type_name -> query.BoundQuery - 22, // 21: vtgate.ExecuteBatchRequest.tablet_type:type_name -> topodata.TabletType - 18, // 22: vtgate.ExecuteBatchRequest.options:type_name -> query.ExecuteOptions - 23, // 23: vtgate.ExecuteBatchResponse.error:type_name -> vtrpc.RPCError + 25, // 20: vtgate.ExecuteBatchRequest.queries:type_name -> query.BoundQuery + 26, // 21: vtgate.ExecuteBatchRequest.tablet_type:type_name -> topodata.TabletType + 22, // 22: vtgate.ExecuteBatchRequest.options:type_name -> query.ExecuteOptions + 27, // 23: vtgate.ExecuteBatchResponse.error:type_name -> vtrpc.RPCError 2, // 24: vtgate.ExecuteBatchResponse.session:type_name -> vtgate.Session - 25, // 25: vtgate.ExecuteBatchResponse.results:type_name -> query.ResultWithError - 20, // 26: vtgate.StreamExecuteRequest.caller_id:type_name -> vtrpc.CallerID - 21, // 27: vtgate.StreamExecuteRequest.query:type_name -> query.BoundQuery - 22, // 28: vtgate.StreamExecuteRequest.tablet_type:type_name -> topodata.TabletType - 18, // 29: vtgate.StreamExecuteRequest.options:type_name -> query.ExecuteOptions + 29, // 25: vtgate.ExecuteBatchResponse.results:type_name -> query.ResultWithError + 24, // 26: vtgate.StreamExecuteRequest.caller_id:type_name -> vtrpc.CallerID + 25, // 27: vtgate.StreamExecuteRequest.query:type_name -> query.BoundQuery + 26, // 28: vtgate.StreamExecuteRequest.tablet_type:type_name -> topodata.TabletType + 22, // 29: vtgate.StreamExecuteRequest.options:type_name -> query.ExecuteOptions 2, // 30: vtgate.StreamExecuteRequest.session:type_name -> vtgate.Session - 24, // 31: vtgate.StreamExecuteResponse.result:type_name -> query.QueryResult - 20, // 32: vtgate.ResolveTransactionRequest.caller_id:type_name -> vtrpc.CallerID - 20, // 33: vtgate.VStreamRequest.caller_id:type_name -> vtrpc.CallerID - 22, // 34: vtgate.VStreamRequest.tablet_type:type_name -> topodata.TabletType - 26, // 35: vtgate.VStreamRequest.vgtid:type_name -> binlogdata.VGtid - 27, // 36: vtgate.VStreamRequest.filter:type_name -> binlogdata.Filter + 28, // 31: vtgate.StreamExecuteResponse.result:type_name -> query.QueryResult + 24, // 32: vtgate.ResolveTransactionRequest.caller_id:type_name -> vtrpc.CallerID + 24, // 33: vtgate.VStreamRequest.caller_id:type_name -> vtrpc.CallerID + 26, // 34: vtgate.VStreamRequest.tablet_type:type_name -> topodata.TabletType + 30, // 35: vtgate.VStreamRequest.vgtid:type_name -> binlogdata.VGtid + 31, // 36: vtgate.VStreamRequest.filter:type_name -> binlogdata.Filter 12, // 37: vtgate.VStreamRequest.flags:type_name -> vtgate.VStreamFlags - 28, // 38: vtgate.VStreamResponse.events:type_name -> binlogdata.VEvent - 29, // 39: vtgate.Session.ShardSession.target:type_name -> query.Target - 30, // 40: vtgate.Session.ShardSession.tablet_alias:type_name -> topodata.TabletAlias - 31, // 41: vtgate.Session.UserDefinedVariablesEntry.value:type_name -> query.BindVariable - 42, // [42:42] is the sub-list for method output_type - 42, // [42:42] is the sub-list for method input_type - 42, // [42:42] is the sub-list for extension type_name - 42, // [42:42] is the sub-list for extension extendee - 0, // [0:42] is the sub-list for field type_name + 32, // 38: vtgate.VStreamResponse.events:type_name -> binlogdata.VEvent + 24, // 39: vtgate.PrepareRequest.caller_id:type_name -> vtrpc.CallerID + 2, // 40: vtgate.PrepareRequest.session:type_name -> vtgate.Session + 25, // 41: vtgate.PrepareRequest.query:type_name -> query.BoundQuery + 27, // 42: vtgate.PrepareResponse.error:type_name -> vtrpc.RPCError + 2, // 43: vtgate.PrepareResponse.session:type_name -> vtgate.Session + 33, // 44: vtgate.PrepareResponse.fields:type_name -> query.Field + 24, // 45: vtgate.CloseSessionRequest.caller_id:type_name -> vtrpc.CallerID + 2, // 46: vtgate.CloseSessionRequest.session:type_name -> vtgate.Session + 27, // 47: vtgate.CloseSessionResponse.error:type_name -> vtrpc.RPCError + 34, // 48: vtgate.Session.ShardSession.target:type_name -> query.Target + 35, // 49: vtgate.Session.ShardSession.tablet_alias:type_name -> topodata.TabletAlias + 36, // 50: vtgate.Session.UserDefinedVariablesEntry.value:type_name -> query.BindVariable + 51, // [51:51] is the sub-list for method output_type + 51, // [51:51] is the sub-list for method input_type + 51, // [51:51] is the sub-list for extension type_name + 51, // [51:51] is the sub-list for extension extendee + 0, // [0:51] is the sub-list for field type_name } func init() { file_vtgate_proto_init() } @@ -1803,6 +2093,54 @@ func file_vtgate_proto_init() { } } file_vtgate_proto_msgTypes[13].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PrepareRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_vtgate_proto_msgTypes[14].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PrepareResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_vtgate_proto_msgTypes[15].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CloseSessionRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_vtgate_proto_msgTypes[16].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CloseSessionResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_vtgate_proto_msgTypes[17].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*Session_ShardSession); i { case 0: return &v.state @@ -1821,7 +2159,7 @@ func file_vtgate_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_vtgate_proto_rawDesc, NumEnums: 2, - NumMessages: 16, + NumMessages: 20, NumExtensions: 0, NumServices: 0, }, diff --git a/go/vt/proto/vtgate/vtgate_vtproto.pb.go b/go/vt/proto/vtgate/vtgate_vtproto.pb.go index 7570dc6e47f..9ea4dc9e7a9 100644 --- a/go/vt/proto/vtgate/vtgate_vtproto.pb.go +++ b/go/vt/proto/vtgate/vtgate_vtproto.pb.go @@ -1163,6 +1163,248 @@ func (m *VStreamResponse) MarshalToSizedBufferVT(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *PrepareRequest) MarshalVT() (dAtA []byte, err error) { + if m == nil { + return nil, nil + } + size := m.SizeVT() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBufferVT(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *PrepareRequest) MarshalToVT(dAtA []byte) (int, error) { + size := m.SizeVT() + return m.MarshalToSizedBufferVT(dAtA[:size]) +} + +func (m *PrepareRequest) MarshalToSizedBufferVT(dAtA []byte) (int, error) { + if m == nil { + return 0, nil + } + i := len(dAtA) + _ = i + var l int + _ = l + if m.unknownFields != nil { + i -= len(m.unknownFields) + copy(dAtA[i:], m.unknownFields) + } + if m.Query != nil { + { + size, err := m.Query.MarshalToSizedBufferVT(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarint(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x1a + } + if m.Session != nil { + { + size, err := m.Session.MarshalToSizedBufferVT(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarint(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + if m.CallerId != nil { + { + size, err := m.CallerId.MarshalToSizedBufferVT(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarint(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *PrepareResponse) MarshalVT() (dAtA []byte, err error) { + if m == nil { + return nil, nil + } + size := m.SizeVT() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBufferVT(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *PrepareResponse) MarshalToVT(dAtA []byte) (int, error) { + size := m.SizeVT() + return m.MarshalToSizedBufferVT(dAtA[:size]) +} + +func (m *PrepareResponse) MarshalToSizedBufferVT(dAtA []byte) (int, error) { + if m == nil { + return 0, nil + } + i := len(dAtA) + _ = i + var l int + _ = l + if m.unknownFields != nil { + i -= len(m.unknownFields) + copy(dAtA[i:], m.unknownFields) + } + if len(m.Fields) > 0 { + for iNdEx := len(m.Fields) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Fields[iNdEx].MarshalToSizedBufferVT(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarint(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x1a + } + } + if m.Session != nil { + { + size, err := m.Session.MarshalToSizedBufferVT(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarint(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + if m.Error != nil { + { + size, err := m.Error.MarshalToSizedBufferVT(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarint(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *CloseSessionRequest) MarshalVT() (dAtA []byte, err error) { + if m == nil { + return nil, nil + } + size := m.SizeVT() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBufferVT(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *CloseSessionRequest) MarshalToVT(dAtA []byte) (int, error) { + size := m.SizeVT() + return m.MarshalToSizedBufferVT(dAtA[:size]) +} + +func (m *CloseSessionRequest) MarshalToSizedBufferVT(dAtA []byte) (int, error) { + if m == nil { + return 0, nil + } + i := len(dAtA) + _ = i + var l int + _ = l + if m.unknownFields != nil { + i -= len(m.unknownFields) + copy(dAtA[i:], m.unknownFields) + } + if m.Session != nil { + { + size, err := m.Session.MarshalToSizedBufferVT(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarint(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + if m.CallerId != nil { + { + size, err := m.CallerId.MarshalToSizedBufferVT(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarint(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *CloseSessionResponse) MarshalVT() (dAtA []byte, err error) { + if m == nil { + return nil, nil + } + size := m.SizeVT() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBufferVT(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *CloseSessionResponse) MarshalToVT(dAtA []byte) (int, error) { + size := m.SizeVT() + return m.MarshalToSizedBufferVT(dAtA[:size]) +} + +func (m *CloseSessionResponse) MarshalToSizedBufferVT(dAtA []byte) (int, error) { + if m == nil { + return 0, nil + } + i := len(dAtA) + _ = i + var l int + _ = l + if m.unknownFields != nil { + i -= len(m.unknownFields) + copy(dAtA[i:], m.unknownFields) + } + if m.Error != nil { + { + size, err := m.Error.MarshalToSizedBufferVT(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarint(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + func encodeVarint(dAtA []byte, offset int, v uint64) int { offset -= sov(v) base := offset @@ -1608,15 +1850,101 @@ func (m *VStreamResponse) SizeVT() (n int) { return n } -func sov(x uint64) (n int) { - return (bits.Len64(x|1) + 6) / 7 -} -func soz(x uint64) (n int) { - return sov(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +func (m *PrepareRequest) SizeVT() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.CallerId != nil { + l = m.CallerId.SizeVT() + n += 1 + l + sov(uint64(l)) + } + if m.Session != nil { + l = m.Session.SizeVT() + n += 1 + l + sov(uint64(l)) + } + if m.Query != nil { + l = m.Query.SizeVT() + n += 1 + l + sov(uint64(l)) + } + if m.unknownFields != nil { + n += len(m.unknownFields) + } + return n } -func (m *Session_ShardSession) UnmarshalVT(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 + +func (m *PrepareResponse) SizeVT() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Error != nil { + l = m.Error.SizeVT() + n += 1 + l + sov(uint64(l)) + } + if m.Session != nil { + l = m.Session.SizeVT() + n += 1 + l + sov(uint64(l)) + } + if len(m.Fields) > 0 { + for _, e := range m.Fields { + l = e.SizeVT() + n += 1 + l + sov(uint64(l)) + } + } + if m.unknownFields != nil { + n += len(m.unknownFields) + } + return n +} + +func (m *CloseSessionRequest) SizeVT() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.CallerId != nil { + l = m.CallerId.SizeVT() + n += 1 + l + sov(uint64(l)) + } + if m.Session != nil { + l = m.Session.SizeVT() + n += 1 + l + sov(uint64(l)) + } + if m.unknownFields != nil { + n += len(m.unknownFields) + } + return n +} + +func (m *CloseSessionResponse) SizeVT() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Error != nil { + l = m.Error.SizeVT() + n += 1 + l + sov(uint64(l)) + } + if m.unknownFields != nil { + n += len(m.unknownFields) + } + return n +} + +func sov(x uint64) (n int) { + return (bits.Len64(x|1) + 6) / 7 +} +func soz(x uint64) (n int) { + return sov(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *Session_ShardSession) UnmarshalVT(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 for iNdEx < l { preIndex := iNdEx var wire uint64 @@ -4442,6 +4770,532 @@ func (m *VStreamResponse) UnmarshalVT(dAtA []byte) error { } return nil } +func (m *PrepareRequest) UnmarshalVT(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: PrepareRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: PrepareRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field CallerId", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLength + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.CallerId == nil { + m.CallerId = &vtrpc.CallerID{} + } + if err := m.CallerId.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Session", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLength + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Session == nil { + m.Session = &Session{} + } + if err := m.Session.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Query", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLength + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Query == nil { + m.Query = &query.BoundQuery{} + } + if err := m.Query.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skip(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLength + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.unknownFields = append(m.unknownFields, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *PrepareResponse) UnmarshalVT(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: PrepareResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: PrepareResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Error", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLength + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Error == nil { + m.Error = &vtrpc.RPCError{} + } + if err := m.Error.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Session", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLength + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Session == nil { + m.Session = &Session{} + } + if err := m.Session.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Fields", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLength + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Fields = append(m.Fields, &query.Field{}) + if err := m.Fields[len(m.Fields)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skip(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLength + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.unknownFields = append(m.unknownFields, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *CloseSessionRequest) UnmarshalVT(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: CloseSessionRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: CloseSessionRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field CallerId", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLength + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.CallerId == nil { + m.CallerId = &vtrpc.CallerID{} + } + if err := m.CallerId.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Session", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLength + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Session == nil { + m.Session = &Session{} + } + if err := m.Session.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skip(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLength + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.unknownFields = append(m.unknownFields, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *CloseSessionResponse) UnmarshalVT(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: CloseSessionResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: CloseSessionResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Error", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLength + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Error == nil { + m.Error = &vtrpc.RPCError{} + } + if err := m.Error.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skip(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLength + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.unknownFields = append(m.unknownFields, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skip(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 diff --git a/go/vt/proto/vtgateservice/vtgateservice.pb.go b/go/vt/proto/vtgateservice/vtgateservice.pb.go index 30ee52a10f5..edfd8055e6f 100644 --- a/go/vt/proto/vtgateservice/vtgateservice.pb.go +++ b/go/vt/proto/vtgateservice/vtgateservice.pb.go @@ -44,7 +44,7 @@ var file_vtgateservice_proto_rawDesc = []byte{ 0x0a, 0x13, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0d, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x1a, 0x0c, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x32, 0x84, 0x03, 0x0a, 0x06, 0x56, 0x69, 0x74, 0x65, 0x73, 0x73, 0x12, 0x3c, 0x0a, + 0x74, 0x6f, 0x32, 0x8f, 0x04, 0x0a, 0x06, 0x56, 0x69, 0x74, 0x65, 0x73, 0x73, 0x12, 0x3c, 0x0a, 0x07, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x12, 0x16, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, @@ -68,12 +68,20 @@ var file_vtgateservice_proto_rawDesc = []byte{ 0x72, 0x65, 0x61, 0x6d, 0x12, 0x16, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x42, 0x42, 0x0a, 0x14, 0x69, 0x6f, 0x2e, - 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x67, 0x72, 0x70, - 0x63, 0x5a, 0x2a, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2e, 0x69, 0x6f, 0x2f, 0x76, 0x69, 0x74, - 0x65, 0x73, 0x73, 0x2f, 0x67, 0x6f, 0x2f, 0x76, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, - 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x62, 0x06, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x3c, 0x0a, 0x07, 0x50, 0x72, 0x65, + 0x70, 0x61, 0x72, 0x65, 0x12, 0x16, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x50, 0x72, + 0x65, 0x70, 0x61, 0x72, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x76, + 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4b, 0x0a, 0x0c, 0x43, 0x6c, 0x6f, 0x73, 0x65, + 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x1b, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, + 0x2e, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x43, 0x6c, + 0x6f, 0x73, 0x65, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x22, 0x00, 0x42, 0x42, 0x0a, 0x14, 0x69, 0x6f, 0x2e, 0x76, 0x69, 0x74, 0x65, 0x73, + 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x5a, 0x2a, 0x76, 0x69, + 0x74, 0x65, 0x73, 0x73, 0x2e, 0x69, 0x6f, 0x2f, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2f, 0x67, + 0x6f, 0x2f, 0x76, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x76, 0x74, 0x67, 0x61, 0x74, + 0x65, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var file_vtgateservice_proto_goTypes = []interface{}{ @@ -82,28 +90,36 @@ var file_vtgateservice_proto_goTypes = []interface{}{ (*vtgate.StreamExecuteRequest)(nil), // 2: vtgate.StreamExecuteRequest (*vtgate.ResolveTransactionRequest)(nil), // 3: vtgate.ResolveTransactionRequest (*vtgate.VStreamRequest)(nil), // 4: vtgate.VStreamRequest - (*vtgate.ExecuteResponse)(nil), // 5: vtgate.ExecuteResponse - (*vtgate.ExecuteBatchResponse)(nil), // 6: vtgate.ExecuteBatchResponse - (*vtgate.StreamExecuteResponse)(nil), // 7: vtgate.StreamExecuteResponse - (*vtgate.ResolveTransactionResponse)(nil), // 8: vtgate.ResolveTransactionResponse - (*vtgate.VStreamResponse)(nil), // 9: vtgate.VStreamResponse + (*vtgate.PrepareRequest)(nil), // 5: vtgate.PrepareRequest + (*vtgate.CloseSessionRequest)(nil), // 6: vtgate.CloseSessionRequest + (*vtgate.ExecuteResponse)(nil), // 7: vtgate.ExecuteResponse + (*vtgate.ExecuteBatchResponse)(nil), // 8: vtgate.ExecuteBatchResponse + (*vtgate.StreamExecuteResponse)(nil), // 9: vtgate.StreamExecuteResponse + (*vtgate.ResolveTransactionResponse)(nil), // 10: vtgate.ResolveTransactionResponse + (*vtgate.VStreamResponse)(nil), // 11: vtgate.VStreamResponse + (*vtgate.PrepareResponse)(nil), // 12: vtgate.PrepareResponse + (*vtgate.CloseSessionResponse)(nil), // 13: vtgate.CloseSessionResponse } var file_vtgateservice_proto_depIdxs = []int32{ - 0, // 0: vtgateservice.Vitess.Execute:input_type -> vtgate.ExecuteRequest - 1, // 1: vtgateservice.Vitess.ExecuteBatch:input_type -> vtgate.ExecuteBatchRequest - 2, // 2: vtgateservice.Vitess.StreamExecute:input_type -> vtgate.StreamExecuteRequest - 3, // 3: vtgateservice.Vitess.ResolveTransaction:input_type -> vtgate.ResolveTransactionRequest - 4, // 4: vtgateservice.Vitess.VStream:input_type -> vtgate.VStreamRequest - 5, // 5: vtgateservice.Vitess.Execute:output_type -> vtgate.ExecuteResponse - 6, // 6: vtgateservice.Vitess.ExecuteBatch:output_type -> vtgate.ExecuteBatchResponse - 7, // 7: vtgateservice.Vitess.StreamExecute:output_type -> vtgate.StreamExecuteResponse - 8, // 8: vtgateservice.Vitess.ResolveTransaction:output_type -> vtgate.ResolveTransactionResponse - 9, // 9: vtgateservice.Vitess.VStream:output_type -> vtgate.VStreamResponse - 5, // [5:10] is the sub-list for method output_type - 0, // [0:5] is the sub-list for method input_type - 0, // [0:0] is the sub-list for extension type_name - 0, // [0:0] is the sub-list for extension extendee - 0, // [0:0] is the sub-list for field type_name + 0, // 0: vtgateservice.Vitess.Execute:input_type -> vtgate.ExecuteRequest + 1, // 1: vtgateservice.Vitess.ExecuteBatch:input_type -> vtgate.ExecuteBatchRequest + 2, // 2: vtgateservice.Vitess.StreamExecute:input_type -> vtgate.StreamExecuteRequest + 3, // 3: vtgateservice.Vitess.ResolveTransaction:input_type -> vtgate.ResolveTransactionRequest + 4, // 4: vtgateservice.Vitess.VStream:input_type -> vtgate.VStreamRequest + 5, // 5: vtgateservice.Vitess.Prepare:input_type -> vtgate.PrepareRequest + 6, // 6: vtgateservice.Vitess.CloseSession:input_type -> vtgate.CloseSessionRequest + 7, // 7: vtgateservice.Vitess.Execute:output_type -> vtgate.ExecuteResponse + 8, // 8: vtgateservice.Vitess.ExecuteBatch:output_type -> vtgate.ExecuteBatchResponse + 9, // 9: vtgateservice.Vitess.StreamExecute:output_type -> vtgate.StreamExecuteResponse + 10, // 10: vtgateservice.Vitess.ResolveTransaction:output_type -> vtgate.ResolveTransactionResponse + 11, // 11: vtgateservice.Vitess.VStream:output_type -> vtgate.VStreamResponse + 12, // 12: vtgateservice.Vitess.Prepare:output_type -> vtgate.PrepareResponse + 13, // 13: vtgateservice.Vitess.CloseSession:output_type -> vtgate.CloseSessionResponse + 7, // [7:14] is the sub-list for method output_type + 0, // [0:7] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name } func init() { file_vtgateservice_proto_init() } diff --git a/go/vt/proto/vtgateservice/vtgateservice_grpc.pb.go b/go/vt/proto/vtgateservice/vtgateservice_grpc.pb.go index e768510158f..41e45ccb8c8 100644 --- a/go/vt/proto/vtgateservice/vtgateservice_grpc.pb.go +++ b/go/vt/proto/vtgateservice/vtgateservice_grpc.pb.go @@ -40,6 +40,12 @@ type VitessClient interface { ResolveTransaction(ctx context.Context, in *vtgate.ResolveTransactionRequest, opts ...grpc.CallOption) (*vtgate.ResolveTransactionResponse, error) // VStream streams binlog events from the requested sources. VStream(ctx context.Context, in *vtgate.VStreamRequest, opts ...grpc.CallOption) (Vitess_VStreamClient, error) + // Prepare is used by the MySQL server plugin as part of supporting prepared statements. + Prepare(ctx context.Context, in *vtgate.PrepareRequest, opts ...grpc.CallOption) (*vtgate.PrepareResponse, error) + // CloseSession closes the session, rolling back any implicit transactions. + // This has the same effect as if a "rollback" statement was executed, + // but does not affect the query statistics. + CloseSession(ctx context.Context, in *vtgate.CloseSessionRequest, opts ...grpc.CallOption) (*vtgate.CloseSessionResponse, error) } type vitessClient struct { @@ -141,6 +147,24 @@ func (x *vitessVStreamClient) Recv() (*vtgate.VStreamResponse, error) { return m, nil } +func (c *vitessClient) Prepare(ctx context.Context, in *vtgate.PrepareRequest, opts ...grpc.CallOption) (*vtgate.PrepareResponse, error) { + out := new(vtgate.PrepareResponse) + err := c.cc.Invoke(ctx, "/vtgateservice.Vitess/Prepare", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *vitessClient) CloseSession(ctx context.Context, in *vtgate.CloseSessionRequest, opts ...grpc.CallOption) (*vtgate.CloseSessionResponse, error) { + out := new(vtgate.CloseSessionResponse) + err := c.cc.Invoke(ctx, "/vtgateservice.Vitess/CloseSession", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // VitessServer is the server API for Vitess service. // All implementations must embed UnimplementedVitessServer // for forward compatibility @@ -166,6 +190,12 @@ type VitessServer interface { ResolveTransaction(context.Context, *vtgate.ResolveTransactionRequest) (*vtgate.ResolveTransactionResponse, error) // VStream streams binlog events from the requested sources. VStream(*vtgate.VStreamRequest, Vitess_VStreamServer) error + // Prepare is used by the MySQL server plugin as part of supporting prepared statements. + Prepare(context.Context, *vtgate.PrepareRequest) (*vtgate.PrepareResponse, error) + // CloseSession closes the session, rolling back any implicit transactions. + // This has the same effect as if a "rollback" statement was executed, + // but does not affect the query statistics. + CloseSession(context.Context, *vtgate.CloseSessionRequest) (*vtgate.CloseSessionResponse, error) mustEmbedUnimplementedVitessServer() } @@ -188,6 +218,12 @@ func (UnimplementedVitessServer) ResolveTransaction(context.Context, *vtgate.Res func (UnimplementedVitessServer) VStream(*vtgate.VStreamRequest, Vitess_VStreamServer) error { return status.Errorf(codes.Unimplemented, "method VStream not implemented") } +func (UnimplementedVitessServer) Prepare(context.Context, *vtgate.PrepareRequest) (*vtgate.PrepareResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method Prepare not implemented") +} +func (UnimplementedVitessServer) CloseSession(context.Context, *vtgate.CloseSessionRequest) (*vtgate.CloseSessionResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method CloseSession not implemented") +} func (UnimplementedVitessServer) mustEmbedUnimplementedVitessServer() {} // UnsafeVitessServer may be embedded to opt out of forward compatibility for this service. @@ -297,6 +333,42 @@ func (x *vitessVStreamServer) Send(m *vtgate.VStreamResponse) error { return x.ServerStream.SendMsg(m) } +func _Vitess_Prepare_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(vtgate.PrepareRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(VitessServer).Prepare(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/vtgateservice.Vitess/Prepare", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(VitessServer).Prepare(ctx, req.(*vtgate.PrepareRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Vitess_CloseSession_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(vtgate.CloseSessionRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(VitessServer).CloseSession(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/vtgateservice.Vitess/CloseSession", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(VitessServer).CloseSession(ctx, req.(*vtgate.CloseSessionRequest)) + } + return interceptor(ctx, in, info, handler) +} + // Vitess_ServiceDesc is the grpc.ServiceDesc for Vitess service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -316,6 +388,14 @@ var Vitess_ServiceDesc = grpc.ServiceDesc{ MethodName: "ResolveTransaction", Handler: _Vitess_ResolveTransaction_Handler, }, + { + MethodName: "Prepare", + Handler: _Vitess_Prepare_Handler, + }, + { + MethodName: "CloseSession", + Handler: _Vitess_CloseSession_Handler, + }, }, Streams: []grpc.StreamDesc{ { diff --git a/proto/vtgate.proto b/proto/vtgate.proto index e2714899d79..01e03fb8b73 100644 --- a/proto/vtgate.proto +++ b/proto/vtgate.proto @@ -292,3 +292,48 @@ message VStreamRequest { message VStreamResponse { repeated binlogdata.VEvent events = 1; } + +// PrepareRequest is the payload to Prepare. +message PrepareRequest { + // caller_id identifies the caller. This is the effective caller ID, + // set by the application to further identify the caller. + vtrpc.CallerID caller_id = 1; + + // session carries the session state. + Session session = 2; + + // query is the query and bind variables to execute. + query.BoundQuery query = 3; +} + +// PrepareResponse is the returned value from Prepare. +message PrepareResponse { + // error contains an application level error if necessary. Note the + // session may have changed, even when an error is returned (for + // instance if a database integrity error happened). + vtrpc.RPCError error = 1; + + // session is the updated session information. + Session session = 2; + + // fields contains the fields, only set if error is unset. + repeated query.Field fields = 3; +} + +// CloseSessionRequest is the payload to CloseSession. +message CloseSessionRequest { + // caller_id identifies the caller. This is the effective caller ID, + // set by the application to further identify the caller. + vtrpc.CallerID caller_id = 1; + + // session carries the session state. + Session session = 2; +} + +// CloseSessionResponse is the returned value from CloseSession. +message CloseSessionResponse { + // error contains an application level error if necessary. Note the + // session may have changed, even when an error is returned (for + // instance if a database integrity error happened). + vtrpc.RPCError error = 1; +} diff --git a/proto/vtgateservice.proto b/proto/vtgateservice.proto index 894b969792c..745302ecdad 100644 --- a/proto/vtgateservice.proto +++ b/proto/vtgateservice.proto @@ -54,4 +54,12 @@ service Vitess { // VStream streams binlog events from the requested sources. rpc VStream(vtgate.VStreamRequest) returns (stream vtgate.VStreamResponse) {}; + + // Prepare is used by the MySQL server plugin as part of supporting prepared statements. + rpc Prepare(vtgate.PrepareRequest) returns (vtgate.PrepareResponse) {}; + + // CloseSession closes the session, rolling back any implicit transactions. + // This has the same effect as if a "rollback" statement was executed, + // but does not affect the query statistics. + rpc CloseSession(vtgate.CloseSessionRequest) returns (vtgate.CloseSessionResponse) {}; } From 0626487f51e584b0c28436f633d317cd37920102 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 31 May 2021 13:05:47 +0530 Subject: [PATCH 2/3] add prepare and closesession method to the vtgateservice interface Signed-off-by: Harshit Gangal --- go/cmd/vtgateclienttest/services/errors.go | 14 ++++++++++++++ go/cmd/vtgateclienttest/services/fallback.go | 8 ++++++++ go/cmd/vtgateclienttest/services/terminal.go | 8 ++++++++ go/vt/vtgate/grpcvtgateconn/suite_test.go | 10 ++++++++++ go/vt/vtgate/vtgateservice/interface.go | 7 +++++++ 5 files changed, 47 insertions(+) diff --git a/go/cmd/vtgateclienttest/services/errors.go b/go/cmd/vtgateclienttest/services/errors.go index 570b7f08190..9a4a5e39366 100644 --- a/go/cmd/vtgateclienttest/services/errors.go +++ b/go/cmd/vtgateclienttest/services/errors.go @@ -139,3 +139,17 @@ func (c *errorClient) StreamExecute(ctx context.Context, session *vtgatepb.Sessi } return c.fallbackClient.StreamExecute(ctx, session, sql, bindVariables, callback) } + +func (c *errorClient) Prepare(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable) (*vtgatepb.Session, []*querypb.Field, error) { + if err := requestToPartialError(sql, session); err != nil { + return session, nil, err + } + if err := requestToError(sql); err != nil { + return session, nil, err + } + return c.fallbackClient.Prepare(ctx, session, sql, bindVariables) +} + +func (c *errorClient) CloseSession(ctx context.Context, session *vtgatepb.Session) error { + return c.fallbackClient.CloseSession(ctx, session) +} diff --git a/go/cmd/vtgateclienttest/services/fallback.go b/go/cmd/vtgateclienttest/services/fallback.go index 5119fa36587..02f9239260b 100644 --- a/go/cmd/vtgateclienttest/services/fallback.go +++ b/go/cmd/vtgateclienttest/services/fallback.go @@ -52,6 +52,14 @@ func (c fallbackClient) StreamExecute(ctx context.Context, session *vtgatepb.Ses return c.fallback.StreamExecute(ctx, session, sql, bindVariables, callback) } +func (c fallbackClient) Prepare(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable) (*vtgatepb.Session, []*querypb.Field, error) { + return c.fallback.Prepare(ctx, session, sql, bindVariables) +} + +func (c fallbackClient) CloseSession(ctx context.Context, session *vtgatepb.Session) error { + return c.fallback.CloseSession(ctx, session) +} + func (c fallbackClient) ResolveTransaction(ctx context.Context, dtid string) error { return c.fallback.ResolveTransaction(ctx, dtid) } diff --git a/go/cmd/vtgateclienttest/services/terminal.go b/go/cmd/vtgateclienttest/services/terminal.go index 6a8e30fd9da..85fa664c2c2 100644 --- a/go/cmd/vtgateclienttest/services/terminal.go +++ b/go/cmd/vtgateclienttest/services/terminal.go @@ -62,6 +62,14 @@ func (c *terminalClient) StreamExecute(ctx context.Context, session *vtgatepb.Se return errTerminal } +func (c *terminalClient) Prepare(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable) (*vtgatepb.Session, []*querypb.Field, error) { + return session, nil, errTerminal +} + +func (c *terminalClient) CloseSession(ctx context.Context, session *vtgatepb.Session) error { + return errTerminal +} + func (c *terminalClient) ResolveTransaction(ctx context.Context, dtid string) error { return errTerminal } diff --git a/go/vt/vtgate/grpcvtgateconn/suite_test.go b/go/vt/vtgate/grpcvtgateconn/suite_test.go index 615e122a32b..dfde6e26e27 100644 --- a/go/vt/vtgate/grpcvtgateconn/suite_test.go +++ b/go/vt/vtgate/grpcvtgateconn/suite_test.go @@ -200,6 +200,16 @@ func (f *fakeVTGateService) StreamExecute(ctx context.Context, session *vtgatepb return nil } +// Prepare is part of the VTGateService interface +func (f *fakeVTGateService) Prepare(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable) (*vtgatepb.Session, []*querypb.Field, error) { + panic("unimplemented") +} + +// CloseSession is part of the VTGateService interface +func (f *fakeVTGateService) CloseSession(ctx context.Context, session *vtgatepb.Session) error { + panic("unimplemented") +} + // ResolveTransaction is part of the VTGateService interface func (f *fakeVTGateService) ResolveTransaction(ctx context.Context, dtid string) error { if f.hasError { diff --git a/go/vt/vtgate/vtgateservice/interface.go b/go/vt/vtgate/vtgateservice/interface.go index 9daf7169a25..3615ab3c431 100644 --- a/go/vt/vtgate/vtgateservice/interface.go +++ b/go/vt/vtgate/vtgateservice/interface.go @@ -36,6 +36,13 @@ type VTGateService interface { Execute(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable) (*vtgatepb.Session, *sqltypes.Result, error) ExecuteBatch(ctx context.Context, session *vtgatepb.Session, sqlList []string, bindVariablesList []map[string]*querypb.BindVariable) (*vtgatepb.Session, []sqltypes.QueryResponse, error) StreamExecute(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) error + // Prepare statement support + Prepare(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable) (*vtgatepb.Session, []*querypb.Field, error) + + // CloseSession closes the session, rolling back any implicit transactions. + // This has the same effect as if a "rollback" statement was executed, + // but does not affect the query statistics. + CloseSession(ctx context.Context, session *vtgatepb.Session) error // 2PC support ResolveTransaction(ctx context.Context, dtid string) error From 3fcc1b3c0383d1fdc2882c9cd1ee43649704a719 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 31 May 2021 14:41:39 +0530 Subject: [PATCH 3/3] implement the grpc methods Signed-off-by: Harshit Gangal --- go/cmd/vtgateclienttest/services/echo.go | 12 +++++ go/vt/vitessdriver/fakeserver_test.go | 25 ++++++++++ go/vt/vtgate/fakerpcvtgateconn/conn.go | 30 ++++++++++-- go/vt/vtgate/grpcvtgateconn/conn.go | 37 +++++++++++++++ go/vt/vtgate/grpcvtgateconn/suite_test.go | 57 ++++++++++++++++++++++- go/vt/vtgate/grpcvtgateservice/server.go | 33 +++++++++++++ go/vt/vtgate/vtgateconn/vtgateconn.go | 13 ++++++ 7 files changed, 203 insertions(+), 4 deletions(-) diff --git a/go/cmd/vtgateclienttest/services/echo.go b/go/cmd/vtgateclienttest/services/echo.go index 63102ce692e..3c79ddd4310 100644 --- a/go/cmd/vtgateclienttest/services/echo.go +++ b/go/cmd/vtgateclienttest/services/echo.go @@ -172,3 +172,15 @@ func (c *echoClient) VStream(ctx context.Context, tabletType topodatapb.TabletTy return c.fallbackClient.VStream(ctx, tabletType, vgtid, filter, flags, callback) } + +func (c *echoClient) Prepare(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable) (*vtgatepb.Session, []*querypb.Field, error) { + if strings.HasPrefix(sql, EchoPrefix) { + return session, echoQueryResult(map[string]interface{}{ + "callerId": callerid.EffectiveCallerIDFromContext(ctx), + "query": sql, + "bindVars": bindVariables, + "session": session, + }).Fields, nil + } + return c.fallbackClient.Prepare(ctx, session, sql, bindVariables) +} diff --git a/go/vt/vitessdriver/fakeserver_test.go b/go/vt/vitessdriver/fakeserver_test.go index ce58fa63ff7..de8be4e6d73 100644 --- a/go/vt/vitessdriver/fakeserver_test.go +++ b/go/vt/vitessdriver/fakeserver_test.go @@ -134,6 +134,31 @@ func (f *fakeVTGateService) StreamExecute(ctx context.Context, session *vtgatepb return nil } +// Prepare is part of the VTGateService interface +func (f *fakeVTGateService) Prepare(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable) (*vtgatepb.Session, []*querypb.Field, error) { + execCase, ok := execMap[sql] + if !ok { + return session, nil, fmt.Errorf("no match for: %s", sql) + } + query := &queryExecute{ + SQL: sql, + BindVariables: bindVariables, + Session: session, + } + if !query.Equal(execCase.execQuery) { + return session, nil, fmt.Errorf("Prepare request mismatch: got %+v, want %+v", query, execCase.execQuery) + } + if execCase.session != nil { + proto.Reset(session) + proto.Merge(session, execCase.session) + } + return session, execCase.result.Fields, nil +} + +func (f *fakeVTGateService) CloseSession(ctx context.Context, session *vtgatepb.Session) error { + return nil +} + // ResolveTransaction is part of the VTGateService interface func (f *fakeVTGateService) ResolveTransaction(ctx context.Context, dtid string) error { if dtid != dtid2 { diff --git a/go/vt/vtgate/fakerpcvtgateconn/conn.go b/go/vt/vtgate/fakerpcvtgateconn/conn.go index ad655cb645d..611fd1ae769 100644 --- a/go/vt/vtgate/fakerpcvtgateconn/conn.go +++ b/go/vt/vtgate/fakerpcvtgateconn/conn.go @@ -20,13 +20,12 @@ limitations under the License. package fakerpcvtgateconn import ( + "context" "fmt" "io" "math/rand" "reflect" - "context" - "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/vtgate/vtgateconn" @@ -158,6 +157,31 @@ func (a *streamExecuteAdapter) Recv() (*sqltypes.Result, error) { return r, nil } +// Prepare please see vtgateconn.Impl.Prepare +func (conn *FakeVTGateConn) Prepare(ctx context.Context, session *vtgatepb.Session, sql string, bindVars map[string]*querypb.BindVariable) (*vtgatepb.Session, []*querypb.Field, error) { + response, ok := conn.execMap[sql] + if !ok { + return nil, nil, fmt.Errorf("no match for: %s", sql) + } + query := &queryExecute{ + SQL: sql, + BindVariables: bindVars, + Session: session, + } + if !reflect.DeepEqual(query, response.execQuery) { + return nil, nil, fmt.Errorf( + "Prepare: %+v, want %+v", query, response.execQuery) + } + reply := *response.reply + s := newSession(true, "test_keyspace", []string{}, topodatapb.TabletType_MASTER) + return s, reply.Fields, nil +} + +// CloseSession please see vtgateconn.Impl.CloseSession +func (conn *FakeVTGateConn) CloseSession(ctx context.Context, session *vtgatepb.Session) error { + panic("not implemented") +} + // ResolveTransaction please see vtgateconn.Impl.ResolveTransaction func (conn *FakeVTGateConn) ResolveTransaction(ctx context.Context, dtid string) error { return nil @@ -197,4 +221,4 @@ func newSession( } // Make sure FakeVTGateConn implements vtgateconn.Impl -var _ (vtgateconn.Impl) = (*FakeVTGateConn)(nil) +var _ vtgateconn.Impl = (*FakeVTGateConn)(nil) diff --git a/go/vt/vtgate/grpcvtgateconn/conn.go b/go/vt/vtgate/grpcvtgateconn/conn.go index a82e81c3a3a..3f50cf1fcdd 100644 --- a/go/vt/vtgate/grpcvtgateconn/conn.go +++ b/go/vt/vtgate/grpcvtgateconn/conn.go @@ -163,6 +163,40 @@ func (conn *vtgateConn) StreamExecute(ctx context.Context, session *vtgatepb.Ses }, nil } +func (conn *vtgateConn) Prepare(ctx context.Context, session *vtgatepb.Session, query string, bindVars map[string]*querypb.BindVariable) (*vtgatepb.Session, []*querypb.Field, error) { + request := &vtgatepb.PrepareRequest{ + CallerId: callerid.EffectiveCallerIDFromContext(ctx), + Session: session, + Query: &querypb.BoundQuery{ + Sql: query, + BindVariables: bindVars, + }, + } + response, err := conn.c.Prepare(ctx, request) + if err != nil { + return session, nil, vterrors.FromGRPC(err) + } + if response.Error != nil { + return response.Session, nil, vterrors.FromVTRPC(response.Error) + } + return response.Session, response.Fields, nil +} + +func (conn *vtgateConn) CloseSession(ctx context.Context, session *vtgatepb.Session) error { + request := &vtgatepb.CloseSessionRequest{ + CallerId: callerid.EffectiveCallerIDFromContext(ctx), + Session: session, + } + response, err := conn.c.CloseSession(ctx, request) + if err != nil { + return vterrors.FromGRPC(err) + } + if response.Error != nil { + return vterrors.FromVTRPC(response.Error) + } + return nil +} + func (conn *vtgateConn) ResolveTransaction(ctx context.Context, dtid string) error { request := &vtgatepb.ResolveTransactionRequest{ CallerId: callerid.EffectiveCallerIDFromContext(ctx), @@ -206,3 +240,6 @@ func (conn *vtgateConn) VStream(ctx context.Context, tabletType topodatapb.Table func (conn *vtgateConn) Close() { conn.cc.Close() } + +// Make sure vtgateConn implements vtgateconn.Impl +var _ vtgateconn.Impl = (*vtgateConn)(nil) diff --git a/go/vt/vtgate/grpcvtgateconn/suite_test.go b/go/vt/vtgate/grpcvtgateconn/suite_test.go index dfde6e26e27..02d73dc0a81 100644 --- a/go/vt/vtgate/grpcvtgateconn/suite_test.go +++ b/go/vt/vtgate/grpcvtgateconn/suite_test.go @@ -202,7 +202,31 @@ func (f *fakeVTGateService) StreamExecute(ctx context.Context, session *vtgatepb // Prepare is part of the VTGateService interface func (f *fakeVTGateService) Prepare(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable) (*vtgatepb.Session, []*querypb.Field, error) { - panic("unimplemented") + if f.hasError { + return session, nil, errTestVtGateError + } + if f.panics { + panic(fmt.Errorf("test forced panic")) + } + f.checkCallerID(ctx, "Prepare") + execCase, ok := execMap[sql] + if !ok { + return session, nil, fmt.Errorf("no match for: %s", sql) + } + query := &queryExecute{ + SQL: sql, + BindVariables: bindVariables, + Session: session, + } + if !query.equal(execCase.execQuery) { + f.t.Errorf("Prepare:\n%+v, want\n%+v", query, execCase.execQuery) + return session, nil, nil + } + if execCase.outSession != nil { + proto.Reset(session) + proto.Merge(session, execCase.outSession) + } + return session, execCase.result.Fields, nil } // CloseSession is part of the VTGateService interface @@ -270,12 +294,14 @@ func RunTests(t *testing.T, impl vtgateconn.Impl, fakeServer vtgateservice.VTGat testExecute(t, session) testStreamExecute(t, session) testExecuteBatch(t, session) + testPrepare(t, session) // force a panic at every call, then test that works fs.panics = true testExecutePanic(t, session) testExecuteBatchPanic(t, session) testStreamExecutePanic(t, session) + testPreparePanic(t, session) fs.panics = false } @@ -294,6 +320,7 @@ func RunErrorTests(t *testing.T, fakeServer vtgateservice.VTGateService) { testExecuteError(t, session, fs) testExecuteBatchError(t, session, fs) testStreamExecuteError(t, session, fs) + testPrepareError(t, session, fs) fs.hasError = false } @@ -472,6 +499,34 @@ func testStreamExecutePanic(t *testing.T, session *vtgateconn.VTGateSession) { expectPanic(t, err) } +func testPrepare(t *testing.T, session *vtgateconn.VTGateSession) { + ctx := newContext() + execCase := execMap["request1"] + _, err := session.Prepare(ctx, execCase.execQuery.SQL, execCase.execQuery.BindVariables) + require.NoError(t, err) + //if !qr.Equal(execCase.result) { + // t.Errorf("Unexpected result from Execute: got\n%#v want\n%#v", qr, execCase.result) + //} + + _, err = session.Prepare(ctx, "none", nil) + require.EqualError(t, err, "no match for: none") +} + +func testPrepareError(t *testing.T, session *vtgateconn.VTGateSession, fake *fakeVTGateService) { + ctx := newContext() + execCase := execMap["errorRequst"] + + _, err := session.Prepare(ctx, execCase.execQuery.SQL, execCase.execQuery.BindVariables) + verifyError(t, err, "Prepare") +} + +func testPreparePanic(t *testing.T, session *vtgateconn.VTGateSession) { + ctx := newContext() + execCase := execMap["request1"] + _, err := session.Prepare(ctx, execCase.execQuery.SQL, execCase.execQuery.BindVariables) + expectPanic(t, err) +} + var testCallerID = &vtrpcpb.CallerID{ Principal: "test_principal", Component: "test_component", diff --git a/go/vt/vtgate/grpcvtgateservice/server.go b/go/vt/vtgate/grpcvtgateservice/server.go index d51a1c614fb..a439fa0aae9 100644 --- a/go/vt/vtgate/grpcvtgateservice/server.go +++ b/go/vt/vtgate/grpcvtgateservice/server.go @@ -179,6 +179,39 @@ func (vtg *VTGate) StreamExecute(request *vtgatepb.StreamExecuteRequest, stream return vterrors.ToGRPC(vtgErr) } +// Prepare is the RPC version of vtgateservice.VTGateService method +func (vtg *VTGate) Prepare(ctx context.Context, request *vtgatepb.PrepareRequest) (response *vtgatepb.PrepareResponse, err error) { + defer vtg.server.HandlePanic(&err) + ctx = withCallerIDContext(ctx, request.CallerId) + + session := request.Session + if session == nil { + session = &vtgatepb.Session{Autocommit: true} + } + + session, fields, err := vtg.server.Prepare(ctx, session, request.Query.Sql, request.Query.BindVariables) + return &vtgatepb.PrepareResponse{ + Fields: fields, + Session: session, + Error: vterrors.ToVTRPC(err), + }, nil +} + +// CloseSession is the RPC version of vtgateservice.VTGateService method +func (vtg *VTGate) CloseSession(ctx context.Context, request *vtgatepb.CloseSessionRequest) (response *vtgatepb.CloseSessionResponse, err error) { + defer vtg.server.HandlePanic(&err) + ctx = withCallerIDContext(ctx, request.CallerId) + + session := request.Session + if session == nil { + session = &vtgatepb.Session{Autocommit: true} + } + err = vtg.server.CloseSession(ctx, session) + return &vtgatepb.CloseSessionResponse{ + Error: vterrors.ToVTRPC(err), + }, nil +} + // ResolveTransaction is the RPC version of vtgateservice.VTGateService method func (vtg *VTGate) ResolveTransaction(ctx context.Context, request *vtgatepb.ResolveTransactionRequest) (response *vtgatepb.ResolveTransactionResponse, err error) { defer vtg.server.HandlePanic(&err) diff --git a/go/vt/vtgate/vtgateconn/vtgateconn.go b/go/vt/vtgate/vtgateconn/vtgateconn.go index 17fe91fef4d..f216e12e6cf 100644 --- a/go/vt/vtgate/vtgateconn/vtgateconn.go +++ b/go/vt/vtgate/vtgateconn/vtgateconn.go @@ -115,6 +115,13 @@ func (sn *VTGateSession) StreamExecute(ctx context.Context, query string, bindVa return sn.impl.StreamExecute(ctx, sn.session, query, bindVars) } +// Prepare performs a VTGate Prepare. +func (sn *VTGateSession) Prepare(ctx context.Context, query string, bindVars map[string]*querypb.BindVariable) ([]*querypb.Field, error) { + session, fields, err := sn.impl.Prepare(ctx, sn.session, query, bindVars) + sn.session = session + return fields, err +} + // // The rest of this file is for the protocol implementations. // @@ -131,6 +138,12 @@ type Impl interface { // StreamExecute executes a streaming query on vtgate. This is a V3 function. StreamExecute(ctx context.Context, session *vtgatepb.Session, query string, bindVars map[string]*querypb.BindVariable) (sqltypes.ResultStream, error) + // Prepare returns the fields information for the query as part of supporting prepare statements. + Prepare(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable) (*vtgatepb.Session, []*querypb.Field, error) + + // CloseSession closes the session provided by rolling back any active transaction. + CloseSession(ctx context.Context, session *vtgatepb.Session) error + // ResolveTransaction resolves the specified 2pc transaction. ResolveTransaction(ctx context.Context, dtid string) error