Skip to content

Commit

Permalink
pubsub: add version, and version request/response
Browse files Browse the repository at this point in the history
  • Loading branch information
chappjc committed May 28, 2019
1 parent 2aaebc7 commit 3b3bd0d
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 9 deletions.
1 change: 1 addition & 0 deletions pubsub/democlient/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ require (
github.com/decred/dcrdata/explorer/types v1.0.1-0.20190501025527-02d6f8e648f7
github.com/decred/dcrdata/pubsub v1.0.1-0.20190501025527-02d6f8e648f7
github.com/decred/dcrdata/pubsub/types v1.0.1-0.20190501025527-02d6f8e648f7
github.com/decred/dcrdata/semver v1.0.0
github.com/decred/dcrdata/txhelpers v1.0.2-0.20190501025527-02d6f8e648f7 // indirect
github.com/decred/slog v1.0.0
github.com/google/go-cmp v0.3.0 // indirect
Expand Down
24 changes: 21 additions & 3 deletions pubsub/democlient/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
exptypes "github.com/decred/dcrdata/explorer/types"
"github.com/decred/dcrdata/pubsub/psclient"
pstypes "github.com/decred/dcrdata/pubsub/types"
"github.com/decred/dcrdata/semver"
"github.com/decred/slog"
survey "gopkg.in/AlecAivazis/survey.v1"
)
Expand Down Expand Up @@ -112,7 +113,7 @@ func main() {
var a actionData
actionPrompt := &survey.Select{
Message: "What now?",
Options: []string{"subscribe", "unsubscribe", "quit"},
Options: []string{"subscribe", "unsubscribe", "version", "quit"},
}
err := survey.AskOne(actionPrompt, &a.action, nil)
if err != nil {
Expand Down Expand Up @@ -148,7 +149,7 @@ func main() {

err := subscribe(a.data)
if err != nil {
fmt.Printf("Failed to subscribe: %v", err)
log.Printf("Failed to subscribe: %v", err)
continue
}

Expand All @@ -158,10 +159,25 @@ func main() {

err := unsubscribe(a.data)
if err != nil {
fmt.Printf("Failed to unsubscribe: %v", err)
log.Printf("Failed to unsubscribe: %v", err)
continue
}

case "version":
serverVer, err := cl.ServerVersion()
if err != nil {
log.Printf("Failed to get server version: %v", err)
continue
}
log.Printf("Server version: %s\n", serverVer)

clientSemVer := psclient.Version()
serverSemVer := semver.NewSemver(serverVer.Major, serverVer.Minor, serverVer.Patch)
if !semver.Compatible(clientSemVer, serverSemVer) {
log.Printf("WARNING! Server version is %v, but client is version %v",
serverSemVer, clientSemVer)
}

case "quit":
cancel()
os.Exit(0)
Expand All @@ -185,6 +201,8 @@ func main() {
case *pstypes.ResponseMessage:
log.Printf("%s request (ID=%d) success = %v. Data: %v",
m.RequestEventId, m.RequestId, m.Success, m.Data)
case *pstypes.Ver:
log.Printf("Server Version: %v", m)
case string:
// generic "message"
log.Printf("Message (%s): %s", msg.EventId, m)
Expand Down
1 change: 1 addition & 0 deletions pubsub/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/decred/dcrdata/explorer/types v1.0.1-0.20190416204615-70a58657e02f
github.com/decred/dcrdata/mempool v1.0.0
github.com/decred/dcrdata/pubsub/types v1.0.1-0.20190416204615-70a58657e02f
github.com/decred/dcrdata/semver v1.0.0
github.com/decred/dcrdata/txhelpers v1.0.2-0.20190416161040-1dc819eb191d
github.com/decred/slog v1.0.0
golang.org/x/net v0.0.0-20190415214537-1da14a5a36f2
Expand Down
77 changes: 71 additions & 6 deletions pubsub/psclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,18 @@ import (
"time"

exptypes "github.com/decred/dcrdata/explorer/types"
pubsub "github.com/decred/dcrdata/pubsub"
pstypes "github.com/decred/dcrdata/pubsub/types"
"github.com/decred/dcrdata/semver"
"golang.org/x/net/websocket"
)

// Version indicates the semantic version of the pubsub module, to which the
// psclient belongs.
func Version() semver.Semver {
return pubsub.Version()
}

func makeRequestMsg(event string, reqID int64) []byte {
event = strings.Trim(event, `"`)

Expand All @@ -28,8 +36,7 @@ func makeRequestMsg(event string, reqID int64) []byte {

// newSubscribeMsg creates a new subscribe message equivalent to marshaling a
// pstypes.WebSocketMessage with EventId set to "subscribe" and Message set to
// the input string, event. json.Marshal is not used, but tests ensure the
// correct result.
// the input string, event.
func newSubscribeMsg(event string, reqID int64) []byte {
subMsg, err := json.Marshal(pstypes.WebSocketMessage{
EventId: "subscribe",
Expand All @@ -44,8 +51,7 @@ func newSubscribeMsg(event string, reqID int64) []byte {

// newUnsubscribeMsg creates a new unsubscribe message equivalent to marshaling
// a pstypes.WebSocketMessage with EventId set to "unsubscribe" and Message set
// to the input string, event. json.Marshal is not used, but tests ensure the
// correct result.
// to the input string, event.
func newUnsubscribeMsg(event string, reqID int64) []byte {
unsubMsg, err := json.Marshal(pstypes.WebSocketMessage{
EventId: "unsubscribe",
Expand All @@ -58,6 +64,20 @@ func newUnsubscribeMsg(event string, reqID int64) []byte {
return unsubMsg
}

// newServerVersionMsg creates a new server version query with EventId set to
// "version", and request message content generated for the specified reqID.
func newServerVersionMsg(reqID int64) []byte {
verMsg, err := json.Marshal(pstypes.WebSocketMessage{
EventId: "version",
Message: makeRequestMsg("", reqID),
})
if err != nil {
panic(fmt.Sprintf("failed to json.Marshal a WebSocketMessage: %v", err))
}

return verMsg
}

var defaultTimeout = 10 * time.Second

// Opts defines the psclient Client options.
Expand Down Expand Up @@ -108,6 +128,25 @@ func New(url string, ctx context.Context, opts *Opts) (*Client, error) {

go cl.receiver()

// Query for the server's pubsub version.
serverVer, err := cl.ServerVersion()
if err != nil {
cl.Stop()
return nil, fmt.Errorf("failed to get server pubsub version: %v", err)
}
log.Infof("Server pubsub version: %s\n", serverVer)

// Ensure the server's pubsub version (actual) is compatible with the
// client's version (required). This allows the client to have a high minor
// version for equal major versions.
clientSemVer := Version()
serverSemVer := semver.NewSemver(serverVer.Major, serverVer.Minor, serverVer.Patch)
if !semver.Compatible(clientSemVer, serverSemVer) {
cl.Stop()
return nil, fmt.Errorf("server pubsub version is %v, but client is version %v",
serverSemVer, clientSemVer)
}

return cl, nil
}

Expand Down Expand Up @@ -326,18 +365,44 @@ func (c *Client) Unsubscribe(event string) (*pstypes.ResponseMessage, error) {
msg := newUnsubscribeMsg(event, reqID)
defer c.deleteRequestID(reqID)

// Send the subscribe message
// Send the subscribe message.
if err := c.send(msg); err != nil {
return nil, fmt.Errorf("failed to send unsubscribe message: %v", err)
}

// Wait for a response with the requestID
// Wait for a response with the requestID.
resp := <-respChan

// Read the response.
return resp, nil
}

// ServerVersion sends a server version query, and returns the response.
func (c *Client) ServerVersion() (*pstypes.Ver, error) {
respChan, reqID := c.newResponseChan()
msg := newServerVersionMsg(reqID)
defer c.deleteRequestID(reqID)

// Send the server version message.
if err := c.send(msg); err != nil {
return nil, fmt.Errorf("failed to send unsubscribe message: %v", err)
}

// Wait for a response with the requestID
resp := <-respChan
if !resp.Success {
return nil, fmt.Errorf("failed to obtain server version")
}

var ver pstypes.Ver
if err := json.Unmarshal([]byte(resp.Data), &ver); err != nil {
return nil, fmt.Errorf("failed to decode server version response: %v", err)
}

// Read the response.
return &ver, nil
}

// receiveMsgTimeout waits for the specified time Duration for a message,
// returned decoded into a WebSocketMessage.
func (c *Client) receiveMsgTimeout(timeout time.Duration) (*pstypes.WebSocketMessage, error) {
Expand Down
24 changes: 24 additions & 0 deletions pubsub/pubsubhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,18 @@ import (
exptypes "github.com/decred/dcrdata/explorer/types"
"github.com/decred/dcrdata/mempool"
pstypes "github.com/decred/dcrdata/pubsub/types"
"github.com/decred/dcrdata/semver"
"github.com/decred/dcrdata/txhelpers"
"golang.org/x/net/websocket"
)

var version = semver.NewSemver(3, 0, 0)

// Version indicates the semantic version of the pubsub module.
func Version() semver.Semver {
return version
}

const (
wsWriteTimeout = 10 * time.Second
wsReadTimeout = 20 * time.Second
Expand Down Expand Up @@ -80,6 +88,7 @@ type PubSubHub struct {
params *chaincfg.Params
invsMtx sync.RWMutex
invs *exptypes.MempoolInfo
ver pstypes.Ver
}

// NewPubSubHub constructs a PubSubHub given a primary and auxiliary data
Expand All @@ -97,6 +106,9 @@ func NewPubSubHub(dataSource wsDataSource) (*PubSubHub, error) {
params := psh.sourceBase.GetChainParams()
psh.params = params

sv := Version()
psh.ver = pstypes.NewVer(sv.Split())

// Development subsidy address of the current network
devSubsidyAddress, err := dbtypes.DevSubsidyAddress(params)
if err != nil {
Expand Down Expand Up @@ -319,6 +331,18 @@ func (psh *PubSubHub) receiveLoop(conn *connection) {
break
}
respMsg.Data = string(b)
respMsg.Success = true

case "version":
var b []byte
b, err = json.Marshal(psh.ver)
if err != nil {
log.Warn("Invalid JSON message: ", err)
respMsg.Data = "error: Could not encode JSON message"
break
}
respMsg.Data = string(b)
respMsg.Success = true

case "ping":
log.Tracef("We've been pinged: %.40s...", reqEvent)
Expand Down
26 changes: 26 additions & 0 deletions pubsub/types/pubsub_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package types

import (
"encoding/json"
"fmt"
"net"
"strconv"
"strings"
Expand All @@ -10,6 +11,23 @@ import (
exptypes "github.com/decred/dcrdata/explorer/types"
)

// Ver is a json tagged version type.
type Ver struct {
Major uint32 `json:"major"`
Minor uint32 `json:"minor"`
Patch uint32 `json:"patch"`
}

// NewVer creates a Ver from the major/minor/patch version components.
func NewVer(major, minor, patch uint32) Ver {
return Ver{major, minor, patch}
}

// String implements Stringer for Ver.
func (v Ver) String() string {
return fmt.Sprintf("%d.%d.%d", v.Major, v.Minor, v.Patch)
}

var (
// ErrWsClosed is the error message when a websocket.(*Conn).Close tries to
// close an already closed connection. See Go's src/internal/poll/fd.go.
Expand Down Expand Up @@ -71,6 +89,10 @@ type HubSignal int
const (
SigSubscribe HubSignal = iota
SigUnsubscribe
SigDecodeTx
SigGetMempoolTxs
SigSendTx
SigVersion
SigNewBlock
SigMempoolUpdate
SigPingAndUserCount
Expand All @@ -94,6 +116,10 @@ var Subscriptions = map[string]HubSignal{
var eventIDs = map[HubSignal]string{
SigSubscribe: "subscribe",
SigUnsubscribe: "unsubscribe",
SigDecodeTx: "decodetx",
SigGetMempoolTxs: "getmempooltxs",
SigSendTx: "sendtx",
SigVersion: "getversion",
SigNewBlock: "newblock",
SigMempoolUpdate: "mempool",
SigPingAndUserCount: "ping",
Expand Down

0 comments on commit 3b3bd0d

Please sign in to comment.