diff --git a/benchmark/client/main.go b/benchmark/client/main.go index ff72472c..5822838f 100644 --- a/benchmark/client/main.go +++ b/benchmark/client/main.go @@ -10,7 +10,7 @@ import ( "encoding/binary" "flag" "fmt" - mrand "math/rand" + //mrand "math/rand" "net" "os" "runtime" @@ -98,7 +98,7 @@ func result() { } func startClient(key string, quit chan bool) { - time.Sleep(time.Duration(mrand.Intn(30)) * time.Second) + //time.Sleep(time.Duration(mrand.Intn(30)) * time.Second) conn, err := net.Dial("tcp", os.Args[3]) if err != nil { diff --git a/libs/proto/logic/logic.pb.go b/libs/proto/logic/logic.pb.go index f642875e..09fdca5f 100644 --- a/libs/proto/logic/logic.pb.go +++ b/libs/proto/logic/logic.pb.go @@ -10,6 +10,7 @@ It has these top-level messages: PushsMsg + BroadCastRoom PingArg PingReply ConnArg @@ -21,7 +22,7 @@ package proto import proto1 "github.com/golang/protobuf/proto" -// discarding unused import gogoproto "gogo/protobuf/gogoproto/gogo.pb" +// discarding unused import gogoproto "gogo/protobuf/gogoproto" import io "io" import fmt "fmt" @@ -39,6 +40,16 @@ func (m *PushsMsg) Reset() { *m = PushsMsg{} } func (m *PushsMsg) String() string { return proto1.CompactTextString(m) } func (*PushsMsg) ProtoMessage() {} +type BroadCastRoom struct { + RoomId int32 `protobuf:"varint,1,opt,name=roomId,proto3" json:"roomId,omitempty"` + Msg []byte `protobuf:"bytes,2,opt,name=msg,proto3" json:"msg,omitempty"` + Ensure bool `protobuf:"varint,3,opt,name=ensure,proto3" json:"ensure,omitempty"` +} + +func (m *BroadCastRoom) Reset() { *m = BroadCastRoom{} } +func (m *BroadCastRoom) String() string { return proto1.CompactTextString(m) } +func (*BroadCastRoom) ProtoMessage() {} + type PingArg struct { } @@ -88,195 +99,387 @@ func (m *DisconnReply) Reset() { *m = DisconnReply{} } func (m *DisconnReply) String() string { return proto1.CompactTextString(m) } func (*DisconnReply) ProtoMessage() {} -func init() { -} -func (m *PushsMsg) Unmarshal(data []byte) error { - l := len(data) - iNdEx := 0 - for iNdEx < l { - var wire uint64 - for shift := uint(0); ; shift += 7 { - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := data[iNdEx] - iNdEx++ - wire |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - switch fieldNum { - case 1: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Server", wireType) - } - for shift := uint(0); ; shift += 7 { - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := data[iNdEx] - iNdEx++ - m.Server |= (int32(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - case 2: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field SubKeys", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := data[iNdEx] - iNdEx++ - stringLen |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - postIndex := iNdEx + int(stringLen) - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.SubKeys = append(m.SubKeys, string(data[iNdEx:postIndex])) - iNdEx = postIndex - case 3: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Msg", wireType) - } - var byteLen int - for shift := uint(0); ; shift += 7 { - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := data[iNdEx] - iNdEx++ - byteLen |= (int(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - postIndex := iNdEx + byteLen - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Msg = append([]byte{}, data[iNdEx:postIndex]...) - iNdEx = postIndex - default: - var sizeOfWire int - for { - sizeOfWire++ - wire >>= 7 - if wire == 0 { - break - } - } - iNdEx -= sizeOfWire - skippy, err := skipLogic(data[iNdEx:]) - if err != nil { - return err - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } +func (m *PushsMsg) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err } - - return nil + return data[:n], nil } -func (m *PingArg) Unmarshal(data []byte) error { - l := len(data) - iNdEx := 0 - for iNdEx < l { - var wire uint64 - for shift := uint(0); ; shift += 7 { - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := data[iNdEx] - iNdEx++ - wire |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break + +func (m *PushsMsg) MarshalTo(data []byte) (n int, err error) { + var i int + _ = i + var l int + _ = l + if m.Server != 0 { + data[i] = 0x8 + i++ + i = encodeVarintLogic(data, i, uint64(m.Server)) + } + if len(m.SubKeys) > 0 { + for _, s := range m.SubKeys { + data[i] = 0x12 + i++ + l = len(s) + for l >= 1<<7 { + data[i] = uint8(uint64(l)&0x7f | 0x80) + l >>= 7 + i++ } + data[i] = uint8(l) + i++ + i += copy(data[i:], s) } - fieldNum := int32(wire >> 3) - switch fieldNum { - default: - var sizeOfWire int - for { - sizeOfWire++ - wire >>= 7 - if wire == 0 { - break - } - } - iNdEx -= sizeOfWire - skippy, err := skipLogic(data[iNdEx:]) - if err != nil { - return err - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy + } + if m.Msg != nil { + if len(m.Msg) > 0 { + data[i] = 0x1a + i++ + i = encodeVarintLogic(data, i, uint64(len(m.Msg))) + i += copy(data[i:], m.Msg) } } + return i, nil +} - return nil +func (m *BroadCastRoom) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil } -func (m *PingReply) Unmarshal(data []byte) error { - l := len(data) - iNdEx := 0 - for iNdEx < l { - var wire uint64 - for shift := uint(0); ; shift += 7 { - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := data[iNdEx] - iNdEx++ - wire |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } + +func (m *BroadCastRoom) MarshalTo(data []byte) (n int, err error) { + var i int + _ = i + var l int + _ = l + if m.RoomId != 0 { + data[i] = 0x8 + i++ + i = encodeVarintLogic(data, i, uint64(m.RoomId)) + } + if m.Msg != nil { + if len(m.Msg) > 0 { + data[i] = 0x12 + i++ + i = encodeVarintLogic(data, i, uint64(len(m.Msg))) + i += copy(data[i:], m.Msg) } - fieldNum := int32(wire >> 3) - switch fieldNum { - default: - var sizeOfWire int - for { - sizeOfWire++ - wire >>= 7 - if wire == 0 { - break - } - } - iNdEx -= sizeOfWire - skippy, err := skipLogic(data[iNdEx:]) - if err != nil { - return err - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy + } + if m.Ensure { + data[i] = 0x18 + i++ + if m.Ensure { + data[i] = 1 + } else { + data[i] = 0 } + i++ } + return i, nil +} - return nil +func (m *PingArg) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil } -func (m *ConnArg) Unmarshal(data []byte) error { - l := len(data) - iNdEx := 0 - for iNdEx < l { + +func (m *PingArg) MarshalTo(data []byte) (n int, err error) { + var i int + _ = i + var l int + _ = l + return i, nil +} + +func (m *PingReply) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *PingReply) MarshalTo(data []byte) (n int, err error) { + var i int + _ = i + var l int + _ = l + return i, nil +} + +func (m *ConnArg) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *ConnArg) MarshalTo(data []byte) (n int, err error) { + var i int + _ = i + var l int + _ = l + if len(m.Token) > 0 { + data[i] = 0xa + i++ + i = encodeVarintLogic(data, i, uint64(len(m.Token))) + i += copy(data[i:], m.Token) + } + if m.Server != 0 { + data[i] = 0x10 + i++ + i = encodeVarintLogic(data, i, uint64(m.Server)) + } + return i, nil +} + +func (m *ConnReply) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *ConnReply) MarshalTo(data []byte) (n int, err error) { + var i int + _ = i + var l int + _ = l + if len(m.Key) > 0 { + data[i] = 0xa + i++ + i = encodeVarintLogic(data, i, uint64(len(m.Key))) + i += copy(data[i:], m.Key) + } + if m.RoomId != 0 { + data[i] = 0x10 + i++ + i = encodeVarintLogic(data, i, uint64(m.RoomId)) + } + return i, nil +} + +func (m *DisconnArg) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *DisconnArg) MarshalTo(data []byte) (n int, err error) { + var i int + _ = i + var l int + _ = l + if len(m.Key) > 0 { + data[i] = 0xa + i++ + i = encodeVarintLogic(data, i, uint64(len(m.Key))) + i += copy(data[i:], m.Key) + } + if m.RoomId != 0 { + data[i] = 0x10 + i++ + i = encodeVarintLogic(data, i, uint64(m.RoomId)) + } + return i, nil +} + +func (m *DisconnReply) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *DisconnReply) MarshalTo(data []byte) (n int, err error) { + var i int + _ = i + var l int + _ = l + if m.Has { + data[i] = 0x8 + i++ + if m.Has { + data[i] = 1 + } else { + data[i] = 0 + } + i++ + } + return i, nil +} + +func encodeFixed64Logic(data []byte, offset int, v uint64) int { + data[offset] = uint8(v) + data[offset+1] = uint8(v >> 8) + data[offset+2] = uint8(v >> 16) + data[offset+3] = uint8(v >> 24) + data[offset+4] = uint8(v >> 32) + data[offset+5] = uint8(v >> 40) + data[offset+6] = uint8(v >> 48) + data[offset+7] = uint8(v >> 56) + return offset + 8 +} +func encodeFixed32Logic(data []byte, offset int, v uint32) int { + data[offset] = uint8(v) + data[offset+1] = uint8(v >> 8) + data[offset+2] = uint8(v >> 16) + data[offset+3] = uint8(v >> 24) + return offset + 4 +} +func encodeVarintLogic(data []byte, offset int, v uint64) int { + for v >= 1<<7 { + data[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + data[offset] = uint8(v) + return offset + 1 +} +func (m *PushsMsg) Size() (n int) { + var l int + _ = l + if m.Server != 0 { + n += 1 + sovLogic(uint64(m.Server)) + } + if len(m.SubKeys) > 0 { + for _, s := range m.SubKeys { + l = len(s) + n += 1 + l + sovLogic(uint64(l)) + } + } + if m.Msg != nil { + l = len(m.Msg) + if l > 0 { + n += 1 + l + sovLogic(uint64(l)) + } + } + return n +} + +func (m *BroadCastRoom) Size() (n int) { + var l int + _ = l + if m.RoomId != 0 { + n += 1 + sovLogic(uint64(m.RoomId)) + } + if m.Msg != nil { + l = len(m.Msg) + if l > 0 { + n += 1 + l + sovLogic(uint64(l)) + } + } + if m.Ensure { + n += 2 + } + return n +} + +func (m *PingArg) Size() (n int) { + var l int + _ = l + return n +} + +func (m *PingReply) Size() (n int) { + var l int + _ = l + return n +} + +func (m *ConnArg) Size() (n int) { + var l int + _ = l + l = len(m.Token) + if l > 0 { + n += 1 + l + sovLogic(uint64(l)) + } + if m.Server != 0 { + n += 1 + sovLogic(uint64(m.Server)) + } + return n +} + +func (m *ConnReply) Size() (n int) { + var l int + _ = l + l = len(m.Key) + if l > 0 { + n += 1 + l + sovLogic(uint64(l)) + } + if m.RoomId != 0 { + n += 1 + sovLogic(uint64(m.RoomId)) + } + return n +} + +func (m *DisconnArg) Size() (n int) { + var l int + _ = l + l = len(m.Key) + if l > 0 { + n += 1 + l + sovLogic(uint64(l)) + } + if m.RoomId != 0 { + n += 1 + sovLogic(uint64(m.RoomId)) + } + return n +} + +func (m *DisconnReply) Size() (n int) { + var l int + _ = l + if m.Has { + n += 2 + } + return n +} + +func sovLogic(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozLogic(x uint64) (n int) { + return sovLogic(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *PushsMsg) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { var wire uint64 for shift := uint(0); ; shift += 7 { if iNdEx >= l { @@ -293,8 +496,24 @@ func (m *ConnArg) Unmarshal(data []byte) error { wireType := int(wire & 0x7) switch fieldNum { case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Server", wireType) + } + m.Server = 0 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.Server |= (int32(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 2: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Token", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field SubKeys", wireType) } var stringLen uint64 for shift := uint(0); ; shift += 7 { @@ -312,23 +531,33 @@ func (m *ConnArg) Unmarshal(data []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Token = string(data[iNdEx:postIndex]) + m.SubKeys = append(m.SubKeys, string(data[iNdEx:postIndex])) iNdEx = postIndex - case 2: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Server", wireType) + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Msg", wireType) } + var byteLen int for shift := uint(0); ; shift += 7 { if iNdEx >= l { return io.ErrUnexpectedEOF } b := data[iNdEx] iNdEx++ - m.Server |= (int32(b) & 0x7F) << shift + byteLen |= (int(b) & 0x7F) << shift if b < 0x80 { break } } + if byteLen < 0 { + return ErrInvalidLengthLogic + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Msg = append([]byte{}, data[iNdEx:postIndex]...) + iNdEx = postIndex default: var sizeOfWire int for { @@ -343,6 +572,9 @@ func (m *ConnArg) Unmarshal(data []byte) error { if err != nil { return err } + if skippy < 0 { + return ErrInvalidLengthLogic + } if (iNdEx + skippy) > l { return io.ErrUnexpectedEOF } @@ -352,7 +584,7 @@ func (m *ConnArg) Unmarshal(data []byte) error { return nil } -func (m *ConnReply) Unmarshal(data []byte) error { +func (m *BroadCastRoom) Unmarshal(data []byte) error { l := len(data) iNdEx := 0 for iNdEx < l { @@ -367,47 +599,68 @@ func (m *ConnReply) Unmarshal(data []byte) error { if b < 0x80 { break } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - switch fieldNum { - case 1: + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field RoomId", wireType) + } + m.RoomId = 0 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.RoomId |= (int32(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 2: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field Msg", wireType) } - var stringLen uint64 + var byteLen int for shift := uint(0); ; shift += 7 { if iNdEx >= l { return io.ErrUnexpectedEOF } b := data[iNdEx] iNdEx++ - stringLen |= (uint64(b) & 0x7F) << shift + byteLen |= (int(b) & 0x7F) << shift if b < 0x80 { break } } - postIndex := iNdEx + int(stringLen) + if byteLen < 0 { + return ErrInvalidLengthLogic + } + postIndex := iNdEx + byteLen if postIndex > l { return io.ErrUnexpectedEOF } - m.Key = string(data[iNdEx:postIndex]) + m.Msg = append([]byte{}, data[iNdEx:postIndex]...) iNdEx = postIndex - case 2: + case 3: if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field RoomId", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field Ensure", wireType) } + var v int for shift := uint(0); ; shift += 7 { if iNdEx >= l { return io.ErrUnexpectedEOF } b := data[iNdEx] iNdEx++ - m.RoomId |= (int32(b) & 0x7F) << shift + v |= (int(b) & 0x7F) << shift if b < 0x80 { break } } + m.Ensure = bool(v != 0) default: var sizeOfWire int for { @@ -422,6 +675,9 @@ func (m *ConnReply) Unmarshal(data []byte) error { if err != nil { return err } + if skippy < 0 { + return ErrInvalidLengthLogic + } if (iNdEx + skippy) > l { return io.ErrUnexpectedEOF } @@ -431,7 +687,7 @@ func (m *ConnReply) Unmarshal(data []byte) error { return nil } -func (m *DisconnArg) Unmarshal(data []byte) error { +func (m *PingArg) Unmarshal(data []byte) error { l := len(data) iNdEx := 0 for iNdEx < l { @@ -448,45 +704,7 @@ func (m *DisconnArg) Unmarshal(data []byte) error { } } fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := data[iNdEx] - iNdEx++ - stringLen |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - postIndex := iNdEx + int(stringLen) - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Key = string(data[iNdEx:postIndex]) - iNdEx = postIndex - case 2: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field RoomId", wireType) - } - for shift := uint(0); ; shift += 7 { - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := data[iNdEx] - iNdEx++ - m.RoomId |= (int32(b) & 0x7F) << shift - if b < 0x80 { - break - } - } default: var sizeOfWire int for { @@ -501,6 +719,9 @@ func (m *DisconnArg) Unmarshal(data []byte) error { if err != nil { return err } + if skippy < 0 { + return ErrInvalidLengthLogic + } if (iNdEx + skippy) > l { return io.ErrUnexpectedEOF } @@ -510,7 +731,7 @@ func (m *DisconnArg) Unmarshal(data []byte) error { return nil } -func (m *DisconnReply) Unmarshal(data []byte) error { +func (m *PingReply) Unmarshal(data []byte) error { l := len(data) iNdEx := 0 for iNdEx < l { @@ -527,25 +748,7 @@ func (m *DisconnReply) Unmarshal(data []byte) error { } } fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) switch fieldNum { - case 1: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Has", wireType) - } - var v int - for shift := uint(0); ; shift += 7 { - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := data[iNdEx] - iNdEx++ - v |= (int(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - m.Has = bool(v != 0) default: var sizeOfWire int for { @@ -560,6 +763,9 @@ func (m *DisconnReply) Unmarshal(data []byte) error { if err != nil { return err } + if skippy < 0 { + return ErrInvalidLengthLogic + } if (iNdEx + skippy) > l { return io.ErrUnexpectedEOF } @@ -569,14 +775,14 @@ func (m *DisconnReply) Unmarshal(data []byte) error { return nil } -func skipLogic(data []byte) (n int, err error) { +func (m *ConnArg) Unmarshal(data []byte) error { l := len(data) iNdEx := 0 for iNdEx < l { var wire uint64 for shift := uint(0); ; shift += 7 { if iNdEx >= l { - return 0, io.ErrUnexpectedEOF + return io.ErrUnexpectedEOF } b := data[iNdEx] iNdEx++ @@ -585,389 +791,389 @@ func skipLogic(data []byte) (n int, err error) { break } } + fieldNum := int32(wire >> 3) wireType := int(wire & 0x7) - switch wireType { - case 0: - for { + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Token", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { if iNdEx >= l { - return 0, io.ErrUnexpectedEOF + return io.ErrUnexpectedEOF } + b := data[iNdEx] iNdEx++ - if data[iNdEx-1] < 0x80 { + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { break } } - return iNdEx, nil - case 1: - iNdEx += 8 - return iNdEx, nil + postIndex := iNdEx + int(stringLen) + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Token = string(data[iNdEx:postIndex]) + iNdEx = postIndex case 2: - var length int + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Server", wireType) + } + m.Server = 0 for shift := uint(0); ; shift += 7 { if iNdEx >= l { - return 0, io.ErrUnexpectedEOF + return io.ErrUnexpectedEOF } b := data[iNdEx] iNdEx++ - length |= (int(b) & 0x7F) << shift + m.Server |= (int32(b) & 0x7F) << shift if b < 0x80 { break } } - iNdEx += length - return iNdEx, nil - case 3: + default: + var sizeOfWire int for { - var innerWire uint64 - var start int = iNdEx - for shift := uint(0); ; shift += 7 { - if iNdEx >= l { - return 0, io.ErrUnexpectedEOF - } - b := data[iNdEx] - iNdEx++ - innerWire |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - innerWireType := int(innerWire & 0x7) - if innerWireType == 4 { + sizeOfWire++ + wire >>= 7 + if wire == 0 { break } - next, err := skipLogic(data[start:]) - if err != nil { - return 0, err - } - iNdEx = start + next } - return iNdEx, nil - case 4: - return iNdEx, nil - case 5: - iNdEx += 4 - return iNdEx, nil - default: - return 0, fmt.Errorf("proto: illegal wireType %d", wireType) - } - } - panic("unreachable") -} -func (m *PushsMsg) Size() (n int) { - var l int - _ = l - if m.Server != 0 { - n += 1 + sovLogic(uint64(m.Server)) - } - if len(m.SubKeys) > 0 { - for _, s := range m.SubKeys { - l = len(s) - n += 1 + l + sovLogic(uint64(l)) - } - } - if m.Msg != nil { - l = len(m.Msg) - if l > 0 { - n += 1 + l + sovLogic(uint64(l)) - } - } - return n -} - -func (m *PingArg) Size() (n int) { - var l int - _ = l - return n -} - -func (m *PingReply) Size() (n int) { - var l int - _ = l - return n -} - -func (m *ConnArg) Size() (n int) { - var l int - _ = l - l = len(m.Token) - if l > 0 { - n += 1 + l + sovLogic(uint64(l)) - } - if m.Server != 0 { - n += 1 + sovLogic(uint64(m.Server)) - } - return n -} - -func (m *ConnReply) Size() (n int) { - var l int - _ = l - l = len(m.Key) - if l > 0 { - n += 1 + l + sovLogic(uint64(l)) - } - if m.RoomId != 0 { - n += 1 + sovLogic(uint64(m.RoomId)) - } - return n -} - -func (m *DisconnArg) Size() (n int) { - var l int - _ = l - l = len(m.Key) - if l > 0 { - n += 1 + l + sovLogic(uint64(l)) - } - if m.RoomId != 0 { - n += 1 + sovLogic(uint64(m.RoomId)) - } - return n -} - -func (m *DisconnReply) Size() (n int) { - var l int - _ = l - if m.Has { - n += 2 - } - return n -} - -func sovLogic(x uint64) (n int) { - for { - n++ - x >>= 7 - if x == 0 { - break + iNdEx -= sizeOfWire + skippy, err := skipLogic(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthLogic + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy } } - return n -} -func sozLogic(x uint64) (n int) { - return sovLogic(uint64((x << 1) ^ uint64((int64(x) >> 63)))) -} -func (m *PushsMsg) Marshal() (data []byte, err error) { - size := m.Size() - data = make([]byte, size) - n, err := m.MarshalTo(data) - if err != nil { - return nil, err - } - return data[:n], nil -} -func (m *PushsMsg) MarshalTo(data []byte) (n int, err error) { - var i int - _ = i - var l int - _ = l - if m.Server != 0 { - data[i] = 0x8 - i++ - i = encodeVarintLogic(data, i, uint64(m.Server)) - } - if len(m.SubKeys) > 0 { - for _, s := range m.SubKeys { - data[i] = 0x12 - i++ - l = len(s) - for l >= 1<<7 { - data[i] = uint8(uint64(l)&0x7f | 0x80) - l >>= 7 - i++ + return nil +} +func (m *ConnReply) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break } - data[i] = uint8(l) - i++ - i += copy(data[i:], s) } - } - if m.Msg != nil { - if len(m.Msg) > 0 { - data[i] = 0x1a - i++ - i = encodeVarintLogic(data, i, uint64(len(m.Msg))) - i += copy(data[i:], m.Msg) + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + postIndex := iNdEx + int(stringLen) + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Key = string(data[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field RoomId", wireType) + } + m.RoomId = 0 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.RoomId |= (int32(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + default: + var sizeOfWire int + for { + sizeOfWire++ + wire >>= 7 + if wire == 0 { + break + } + } + iNdEx -= sizeOfWire + skippy, err := skipLogic(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthLogic + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy } } - return i, nil -} - -func (m *PingArg) Marshal() (data []byte, err error) { - size := m.Size() - data = make([]byte, size) - n, err := m.MarshalTo(data) - if err != nil { - return nil, err - } - return data[:n], nil -} - -func (m *PingArg) MarshalTo(data []byte) (n int, err error) { - var i int - _ = i - var l int - _ = l - return i, nil -} - -func (m *PingReply) Marshal() (data []byte, err error) { - size := m.Size() - data = make([]byte, size) - n, err := m.MarshalTo(data) - if err != nil { - return nil, err - } - return data[:n], nil -} - -func (m *PingReply) MarshalTo(data []byte) (n int, err error) { - var i int - _ = i - var l int - _ = l - return i, nil -} - -func (m *ConnArg) Marshal() (data []byte, err error) { - size := m.Size() - data = make([]byte, size) - n, err := m.MarshalTo(data) - if err != nil { - return nil, err - } - return data[:n], nil -} - -func (m *ConnArg) MarshalTo(data []byte) (n int, err error) { - var i int - _ = i - var l int - _ = l - if len(m.Token) > 0 { - data[i] = 0xa - i++ - i = encodeVarintLogic(data, i, uint64(len(m.Token))) - i += copy(data[i:], m.Token) - } - if m.Server != 0 { - data[i] = 0x10 - i++ - i = encodeVarintLogic(data, i, uint64(m.Server)) - } - return i, nil -} -func (m *ConnReply) Marshal() (data []byte, err error) { - size := m.Size() - data = make([]byte, size) - n, err := m.MarshalTo(data) - if err != nil { - return nil, err - } - return data[:n], nil + return nil } - -func (m *ConnReply) MarshalTo(data []byte) (n int, err error) { - var i int - _ = i - var l int - _ = l - if len(m.Key) > 0 { - data[i] = 0xa - i++ - i = encodeVarintLogic(data, i, uint64(len(m.Key))) - i += copy(data[i:], m.Key) - } - if m.RoomId != 0 { - data[i] = 0x10 - i++ - i = encodeVarintLogic(data, i, uint64(m.RoomId)) +func (m *DisconnArg) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + postIndex := iNdEx + int(stringLen) + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Key = string(data[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field RoomId", wireType) + } + m.RoomId = 0 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.RoomId |= (int32(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + default: + var sizeOfWire int + for { + sizeOfWire++ + wire >>= 7 + if wire == 0 { + break + } + } + iNdEx -= sizeOfWire + skippy, err := skipLogic(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthLogic + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } } - return i, nil -} -func (m *DisconnArg) Marshal() (data []byte, err error) { - size := m.Size() - data = make([]byte, size) - n, err := m.MarshalTo(data) - if err != nil { - return nil, err - } - return data[:n], nil + return nil } - -func (m *DisconnArg) MarshalTo(data []byte) (n int, err error) { - var i int - _ = i - var l int - _ = l - if len(m.Key) > 0 { - data[i] = 0xa - i++ - i = encodeVarintLogic(data, i, uint64(len(m.Key))) - i += copy(data[i:], m.Key) - } - if m.RoomId != 0 { - data[i] = 0x10 - i++ - i = encodeVarintLogic(data, i, uint64(m.RoomId)) +func (m *DisconnReply) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Has", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + v |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.Has = bool(v != 0) + default: + var sizeOfWire int + for { + sizeOfWire++ + wire >>= 7 + if wire == 0 { + break + } + } + iNdEx -= sizeOfWire + skippy, err := skipLogic(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthLogic + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } } - return i, nil -} -func (m *DisconnReply) Marshal() (data []byte, err error) { - size := m.Size() - data = make([]byte, size) - n, err := m.MarshalTo(data) - if err != nil { - return nil, err - } - return data[:n], nil + return nil } - -func (m *DisconnReply) MarshalTo(data []byte) (n int, err error) { - var i int - _ = i - var l int - _ = l - if m.Has { - data[i] = 0x8 - i++ - if m.Has { - data[i] = 1 - } else { - data[i] = 0 +func skipLogic(data []byte) (n int, err error) { + l := len(data) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for { + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if data[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + iNdEx += length + if length < 0 { + return 0, ErrInvalidLengthLogic + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipLogic(data[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) } - i++ } - return i, nil + panic("unreachable") } -func encodeFixed64Logic(data []byte, offset int, v uint64) int { - data[offset] = uint8(v) - data[offset+1] = uint8(v >> 8) - data[offset+2] = uint8(v >> 16) - data[offset+3] = uint8(v >> 24) - data[offset+4] = uint8(v >> 32) - data[offset+5] = uint8(v >> 40) - data[offset+6] = uint8(v >> 48) - data[offset+7] = uint8(v >> 56) - return offset + 8 -} -func encodeFixed32Logic(data []byte, offset int, v uint32) int { - data[offset] = uint8(v) - data[offset+1] = uint8(v >> 8) - data[offset+2] = uint8(v >> 16) - data[offset+3] = uint8(v >> 24) - return offset + 4 -} -func encodeVarintLogic(data []byte, offset int, v uint64) int { - for v >= 1<<7 { - data[offset] = uint8(v&0x7f | 0x80) - v >>= 7 - offset++ - } - data[offset] = uint8(v) - return offset + 1 -} +var ( + ErrInvalidLengthLogic = fmt.Errorf("proto: negative length found during unmarshaling") +) diff --git a/libs/proto/logic/logic.proto b/libs/proto/logic/logic.proto index afac4f3b..fd6e2445 100644 --- a/libs/proto/logic/logic.proto +++ b/libs/proto/logic/logic.proto @@ -14,6 +14,12 @@ message PushsMsg { bytes msg = 3; } +message BroadCastRoom { + int32 roomId=1; + bytes msg = 2; + bool ensure = 3; +} + message PingArg { } diff --git a/logic/http.go b/logic/http.go index aac61a44..ca1ecd99 100644 --- a/logic/http.go +++ b/logic/http.go @@ -1,14 +1,15 @@ package main import ( - log "code.google.com/p/log4go" "encoding/json" - inet "github.com/Terry-Mao/goim/libs/net" "io/ioutil" "net" "net/http" "strconv" "time" + + log "code.google.com/p/log4go" + inet "github.com/Terry-Mao/goim/libs/net" ) func InitHTTP() (err error) { @@ -19,6 +20,7 @@ func InitHTTP() (err error) { httpServeMux.HandleFunc("/1/push", Push) httpServeMux.HandleFunc("/1/pushs", Pushs) httpServeMux.HandleFunc("/1/push/all", PushAll) + httpServeMux.HandleFunc("/1/push/room", PushRoom) httpServeMux.HandleFunc("/1/server/del", DelServer) httpServeMux.HandleFunc("/1/count", Count) log.Info("start http listen:\"%s\"", Conf.HTTPAddrs[i]) @@ -166,6 +168,43 @@ func Pushs(w http.ResponseWriter, r *http.Request) { return } +func PushRoom(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + http.Error(w, "Method Not Allowed", 405) + return + } + var ( + bodyBytes []byte + body string + rid int + err error + param = r.URL.Query() + res = map[string]interface{}{"ret": OK} + ) + defer retPWrite(w, r, res, &body, time.Now()) + if bodyBytes, err = ioutil.ReadAll(r.Body); err != nil { + log.Error("ioutil.ReadAll() failed (%v)", err) + res["ret"] = InternalErr + return + } + body = string(bodyBytes) + ridStr := param.Get("rid") + enable, _ := strconv.ParseBool(param.Get("ensure")) + // push room + if rid, err = strconv.Atoi(ridStr); err != nil { + log.Error("strconv.Atoi(\"%s\") error(%v)", ridStr, err) + res["ret"] = InternalErr + return + } + if err = broadcastRoomKafka(int32(rid), bodyBytes, enable); err != nil { + log.Error("broadcastRoomKafka(\"%s\",\"%s\",\"%d\") error(%s)", rid, body, enable, err) + res["ret"] = InternalErr + return + } + res["ret"] = OK + return +} + func PushAll(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { http.Error(w, "Method Not Allowed", 405) @@ -175,7 +214,6 @@ func PushAll(w http.ResponseWriter, r *http.Request) { bodyBytes []byte body string err error - ridStr = r.URL.Query().Get("rid") res = map[string]interface{}{"ret": OK} ) defer retPWrite(w, r, res, &body, time.Now()) @@ -185,25 +223,11 @@ func PushAll(w http.ResponseWriter, r *http.Request) { return } body = string(bodyBytes) - if len(ridStr) > 0 { - // push room - if _, err = strconv.Atoi(ridStr); err != nil { - log.Error("strconv.Atoi(\"%s\") error(%v)", ridStr, err) - res["ret"] = InternalErr - return - } - if err = broadcastRoomKafka(ridStr, bodyBytes); err != nil { - log.Error("broadcastKafka(\"%s\") error(%s)", body, err) - res["ret"] = InternalErr - return - } - } else { - // push all - if err := broadcastKafka(bodyBytes); err != nil { - log.Error("broadcastKafka(\"%s\") error(%s)", body, err) - res["ret"] = InternalErr - return - } + // push all + if err := broadcastKafka(bodyBytes); err != nil { + log.Error("broadcastKafka(\"%s\") error(%s)", body, err) + res["ret"] = InternalErr + return } res["ret"] = OK return diff --git a/logic/job/kafka.go b/logic/job/kafka.go index b45da98c..d97465a5 100644 --- a/logic/job/kafka.go +++ b/logic/job/kafka.go @@ -1,14 +1,16 @@ package main import ( + llog "log" + "os" + "time" + log "code.google.com/p/log4go" "github.com/Shopify/sarama" "github.com/Terry-Mao/goim/libs/define" lproto "github.com/Terry-Mao/goim/libs/proto/logic" "github.com/gogo/protobuf/proto" "github.com/wvanbergen/kafka/consumergroup" - "strconv" - "time" ) const ( @@ -20,6 +22,7 @@ const ( func InitKafka() error { log.Info("start topic:%s consumer", Conf.KafkaTopic) log.Info("consumer group name:%s", KAFKA_GROUP_NAME) + sarama.Logger = llog.New(os.Stdout, "[Sarama] ", llog.LstdFlags) config := consumergroup.NewConfig() config.Offsets.Initial = sarama.OffsetNewest config.Offsets.ProcessingTimeout = OFFSETS_PROCESSING_TIMEOUT_SECONDS @@ -49,18 +52,21 @@ func push(op string, msg []byte) (err error) { if op == define.KAFKA_MESSAGE_MULTI { m := &lproto.PushsMsg{} if err = proto.Unmarshal(msg, m); err != nil { - log.Error("proto.Unmarshal(%s) serverId:%d error(%s)", msg, err) + log.Error("proto.Unmarshal(%s) error(%s)", msg, err) return } mpush(m.Server, m.SubKeys, m.Msg) } else if op == define.KAFKA_MESSAGE_BROADCAST { broadcast(msg) - } else { - if roomId, err := strconv.Atoi(op); err != nil { - log.Warn("strconv.Atoi(\"%s\") error(%v)", op, err) - } else { - broadcastRoom(int32(roomId), msg) + } else if op == define.KAFKA_MESSAGE_BROADCAST_ROOM { + m := &lproto.BroadCastRoom{} + if err = proto.Unmarshal(msg, m); err != nil { + log.Error("proto.Unmarshal(%s) error(%s)", msg, err) + return } + broadcastRoom(int32(m.RoomId), m.Msg, m.Ensure) + } else { + log.Error("unknown operation:%s", op) } return } diff --git a/logic/job/push.go b/logic/job/push.go index 696a4104..512adb82 100644 --- a/logic/job/push.go +++ b/logic/job/push.go @@ -1,10 +1,11 @@ package main import ( + "math/rand" + log "code.google.com/p/log4go" "github.com/Terry-Mao/goim/libs/define" "github.com/Terry-Mao/protorpc" - "math/rand" ) type pushArg struct { @@ -12,6 +13,7 @@ type pushArg struct { SubKeys []string Msg []byte RoomId int32 + Ensure bool } var ( @@ -49,7 +51,7 @@ func mpush(server int32, subkeys []string, msg []byte) { } // mssage broadcast room -func broadcastRoom(roomId int32, msg []byte) { +func broadcastRoom(roomId int32, msg []byte, ensure bool) { var ( c *protorpc.Client ok bool @@ -60,7 +62,7 @@ func broadcastRoom(roomId int32, msg []byte) { if servers, ok = RoomServersMap[roomId]; ok { for serverId, _ = range servers { if c, err = getCometByServerId(serverId); err == nil { - pushChs[rand.Int()%Conf.PushChan] <- &pushArg{C: c, Msg: msg, RoomId: roomId} + pushChs[rand.Int()%Conf.PushChan] <- &pushArg{C: c, Msg: msg, RoomId: roomId, Ensure: ensure} } } } diff --git a/logic/job/room.go b/logic/job/room.go index 1fec5d50..1c586b28 100644 --- a/logic/job/room.go +++ b/logic/job/room.go @@ -1,8 +1,9 @@ package main import ( - "github.com/Terry-Mao/protorpc" "time" + + "github.com/Terry-Mao/protorpc" ) const ( diff --git a/logic/kafka.go b/logic/kafka.go index 7fbd425b..dc20787f 100644 --- a/logic/kafka.go +++ b/logic/kafka.go @@ -1,6 +1,7 @@ package main import ( + log "code.google.com/p/log4go" "github.com/Shopify/sarama" "github.com/Terry-Mao/goim/libs/define" lproto "github.com/Terry-Mao/goim/libs/proto/logic" @@ -12,17 +13,45 @@ const ( ) var ( - producer sarama.SyncProducer + producer sarama.AsyncProducer ) func InitKafka(kafkaAddrs []string) (err error) { config := sarama.NewConfig() - config.Producer.RequiredAcks = sarama.WaitForAll + config.Producer.RequiredAcks = sarama.NoResponse config.Producer.Partitioner = sarama.NewRandomPartitioner - producer, err = sarama.NewSyncProducer(kafkaAddrs, config) + config.Producer.Return.Successes = true + config.Producer.Return.Errors = true + producer, err = sarama.NewAsyncProducer(kafkaAddrs, config) + go handleSuccess() + go handleError() return } +func handleSuccess() { + var ( + pm *sarama.ProducerMessage + ) + for { + pm = <-producer.Successes() + if pm != nil { + log.Info("producer message success, partition:%d offset:%d key:%s valus:%s", pm.Partition, pm.Offset, pm.Key, pm.Value) + } + } +} + +func handleError() { + var ( + err *sarama.ProducerError + ) + for { + err = <-producer.Errors() + if err != nil { + log.Error("producer message error, partition:%d offset:%d key:%s valus:%s error(%v)", err.Msg.Partition, err.Msg.Offset, err.Msg.Key, err.Msg.Value, err.Err) + } + } +} + func mpushKafka(server int32, keys []string, msg []byte) (err error) { var ( vBytes []byte @@ -31,25 +60,23 @@ func mpushKafka(server int32, keys []string, msg []byte) (err error) { if vBytes, err = proto.Marshal(v); err != nil { return } - message := &sarama.ProducerMessage{Topic: KafkaPushsTopic, Key: sarama.StringEncoder(define.KAFKA_MESSAGE_MULTI), Value: sarama.ByteEncoder(vBytes)} - if _, _, err = producer.SendMessage(message); err != nil { - return - } + producer.Input() <- &sarama.ProducerMessage{Topic: KafkaPushsTopic, Key: sarama.StringEncoder(define.KAFKA_MESSAGE_MULTI), Value: sarama.ByteEncoder(vBytes)} return } func broadcastKafka(msg []byte) (err error) { - message := &sarama.ProducerMessage{Topic: KafkaPushsTopic, Key: sarama.StringEncoder(define.KAFKA_MESSAGE_BROADCAST), Value: sarama.ByteEncoder(msg)} - if _, _, err = producer.SendMessage(message); err != nil { - return - } + producer.Input() <- &sarama.ProducerMessage{Topic: KafkaPushsTopic, Key: sarama.StringEncoder(define.KAFKA_MESSAGE_BROADCAST), Value: sarama.ByteEncoder(msg)} return } -func broadcastRoomKafka(ridStr string, msg []byte) (err error) { - message := &sarama.ProducerMessage{Topic: KafkaPushsTopic, Key: sarama.StringEncoder(ridStr), Value: sarama.ByteEncoder(msg)} - if _, _, err = producer.SendMessage(message); err != nil { +func broadcastRoomKafka(rid int32, msg []byte, ensure bool) (err error) { + var ( + vBytes []byte + v = &lproto.BroadCastRoom{RoomId: rid, Msg: msg, Ensure: ensure} + ) + if vBytes, err = proto.Marshal(v); err != nil { return } + producer.Input() <- &sarama.ProducerMessage{Topic: KafkaPushsTopic, Key: sarama.StringEncoder(define.KAFKA_MESSAGE_BROADCAST_ROOM), Value: sarama.ByteEncoder(vBytes)} return }