diff --git a/go.mod b/go.mod index 5c6d720..9284786 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,5 @@ module github.com/kotalco/crossover-cache -go 1.21.6 +go 1.21 -require github.com/kotalco/resp v0.0.0-20240204195036-70bfc30e6a35 // indirect +require github.com/kotalco/resp v0.0.0-20240310155233-e8ffa70a7031 diff --git a/go.sum b/go.sum index f0240d6..7ffc9f0 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,2 @@ -github.com/kotalco/resp v0.0.0-20240204195036-70bfc30e6a35 h1:Wll2iyjemysZsqTp/kyQJ+ORqtcboVjziV4ZXsEl7sc= -github.com/kotalco/resp v0.0.0-20240204195036-70bfc30e6a35/go.mod h1:E3GFMlS/NGkWPAmS/Ad9h5pWdtgu3/FuE+clsqqFSzs= +github.com/kotalco/resp v0.0.0-20240310155233-e8ffa70a7031 h1:F1VO30pQBmIo3Vmw2LtcjM6qd8ocxatXiUc8SFxAv2Y= +github.com/kotalco/resp v0.0.0-20240310155233-e8ffa70a7031/go.mod h1:+7+oV/yGtCk7K98vCqoFkkhG0gzoyfp/zNMtuSyxn08= diff --git a/vendor/github.com/kotalco/resp/client.go b/vendor/github.com/kotalco/resp/client.go index 94ba25e..7893fdc 100644 --- a/vendor/github.com/kotalco/resp/client.go +++ b/vendor/github.com/kotalco/resp/client.go @@ -10,6 +10,7 @@ import ( ) const ( + PingCmd = "PING\n" SendCmd = "*3\r\n$3\r\nSET\r\n$%d\r\n%s\r\n$%d\r\n%s\r\n" IncrCmd = "*2\r\n$4\r\nINCR\r\n$%d\r\n%s\r\n" ExpireCmd = "*3\r\n$6\r\nEXPIRE\r\n$%d\r\n%s\r\n$%d\r\n%d\r\n" @@ -22,12 +23,15 @@ type IClient interface { GetConnection() (IConnection, error) ReleaseConnection(conn IConnection) Do(ctx context.Context, command string) (string, error) + Ping(ctx context.Context) (string, error) Set(ctx context.Context, key string, value string) error SetWithTTL(ctx context.Context, key string, value string, ttl int) error Get(ctx context.Context, key string) (string, error) Delete(ctx context.Context, key string) error Incr(ctx context.Context, key string) (int, error) Expire(ctx context.Context, key string, seconds int) (bool, error) + Pool() chan IConnection + PoolSize() int Close() } @@ -73,7 +77,7 @@ func (client *Client) GetConnection() (IConnection, error) { case conn := <-client.pool: return conn, nil default: - // Pool is empty now all connection are being used , create a new connection till some connections get released + // pool is empty now all connection are being used , create a new connection till some connections get released conn, err := NewRedisConnection(client.dialer, client.address, client.auth) if err != nil { return nil, err @@ -85,7 +89,9 @@ func (client *Client) GetConnection() (IConnection, error) { func (client *Client) ReleaseConnection(conn IConnection) { client.mu.Lock() defer client.mu.Unlock() - if len(client.pool) >= client.poolSize { + + err := conn.Ping(context.Background()) + if len(client.pool) >= client.poolSize || err != nil { err := conn.Close() if err != nil { return @@ -130,6 +136,17 @@ func (client *Client) Do(ctx context.Context, command string) (string, error) { } +func (client *Client) Ping(ctx context.Context) (string, error) { + response, err := client.Do(ctx, PingCmd) + if err != nil { + return "", err + } + if response != "PONG" { + return "", errors.New("unexpected response from server") + } + return response, nil +} + func (client *Client) Set(ctx context.Context, key string, value string) error { cmd := fmt.Sprintf(SendCmd, len(key), key, len(value), value) response, err := client.Do(ctx, cmd) @@ -137,7 +154,7 @@ func (client *Client) Set(ctx context.Context, key string, value string) error { return err } if response != "OK" { - return errors.New("unexpected response from server") + return fmt.Errorf("set: unexpected response from server %s", response) } return nil } @@ -152,7 +169,7 @@ func (client *Client) Incr(ctx context.Context, key string) (int, error) { // Parse the response => should be in the format: ":\r\n" for a successful INCR command var newValue int if _, err := fmt.Sscanf(response, ":%d\r\n", &newValue); err != nil { - return 0, errors.New("unexpected response from server") + return 0, fmt.Errorf("incr: unexpected response from server %s", response) } // Return the new value @@ -172,7 +189,7 @@ func (client *Client) Expire(ctx context.Context, key string, seconds int) (bool } else if response == ":0" { return false, nil } else { - return false, errors.New("unexpected response from server") + return false, fmt.Errorf("expire: unexpected response from server %s", response) } } @@ -183,7 +200,7 @@ func (client *Client) SetWithTTL(ctx context.Context, key string, value string, return err } if response != "OK" { - return errors.New("unexpected response from server: " + response) + return fmt.Errorf("setWithTTL: unexpected response from server %s", response) } return nil } @@ -206,15 +223,26 @@ func (client *Client) Delete(ctx context.Context, key string) error { // ":1" for successful deletion of one key. // ":0" If the key does not exist if response != ":1" && response != ":0" { - return errors.New("unexpected response from server") + return fmt.Errorf("delete: unexpected response from server %s", response) } return nil } func (client *Client) Close() { - close(client.pool) - for conn := range client.pool { + client.mu.Lock() + defer client.mu.Unlock() + for len(client.pool) > 0 { + conn := <-client.pool _ = conn.Close() } + close(client.pool) + +} + +func (client *Client) Pool() chan IConnection { + return client.pool +} +func (client *Client) PoolSize() int { + return client.poolSize } diff --git a/vendor/github.com/kotalco/resp/connection.go b/vendor/github.com/kotalco/resp/connection.go index 29c7eb6..6e563e8 100644 --- a/vendor/github.com/kotalco/resp/connection.go +++ b/vendor/github.com/kotalco/resp/connection.go @@ -13,6 +13,7 @@ import ( type IConnection interface { Auth(ctx context.Context, password string) error + Ping(ctx context.Context) error Send(ctx context.Context, command string) error Receive(ctx context.Context) (string, error) Close() error @@ -65,6 +66,31 @@ func (rc *Connection) Auth(ctx context.Context, password string) error { return nil } +func (rc *Connection) Ping(ctx context.Context) error { + // Check if the context has been canceled before attempting the operation + if err := ctx.Err(); err != nil { + return err + } + + // Send the PING command to the Redis server + if err := rc.Send(ctx, "PING"); err != nil { + return err + } + + // Receive the reply from the Redis server + reply, err := rc.Receive(ctx) + if err != nil { + return err + } + + // Check if the reply is a valid PONG response + if reply != "PONG" { + return errors.New("did not receive PONG response") + } + + return nil +} + func (rc *Connection) Send(ctx context.Context, command string) error { if err := ctx.Err(); err != nil { return err diff --git a/vendor/modules.txt b/vendor/modules.txt index e2add0e..c6f39c7 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1,3 +1,3 @@ -# github.com/kotalco/resp v0.0.0-20240204195036-70bfc30e6a35 -## explicit; go 1.21.6 +# github.com/kotalco/resp v0.0.0-20240310155233-e8ffa70a7031 +## explicit; go 1.21 github.com/kotalco/resp