Skip to content

Commit

Permalink
feat: fixed subscription repo
Browse files Browse the repository at this point in the history
  • Loading branch information
jirevwe committed Nov 14, 2024
1 parent aee13bd commit 0ec67a9
Show file tree
Hide file tree
Showing 3 changed files with 454 additions and 407 deletions.
124 changes: 84 additions & 40 deletions database/sqlite3/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"errors"
"fmt"
"gopkg.in/guregu/null.v4"
"math"
"time"

Expand Down Expand Up @@ -33,23 +34,23 @@ const (

updateSubscription = `
UPDATE subscriptions SET
name=$3,
endpoint_id=$4,
source_id=$5,
alert_config_count=$6,
alert_config_threshold=$7,
retry_config_type=$8,
retry_config_duration=$9,
retry_config_retry_count=$10,
filter_config_event_types=$11,
filter_config_filter_headers=$12,
filter_config_filter_body=$13,
filter_config_filter_is_flattened=$14,
rate_limit_config_count=$15,
rate_limit_config_duration=$16,
function=$17,
updated_at=now()
WHERE id = $1 AND project_id = $2
name=$1,
endpoint_id=$2,
source_id=$3,
alert_config_count=$4,
alert_config_threshold=$5,
retry_config_type=$6,
retry_config_duration=$7,
retry_config_retry_count=$8,
filter_config_event_types=$9,
filter_config_filter_headers=$10,
filter_config_filter_body=$11,
filter_config_filter_is_flattened=$12,
rate_limit_config_count=$13,
rate_limit_config_duration=$14,
function=$15,
updated_at=$16
WHERE id = $17 AND project_id = $18
AND deleted_at IS NULL;
`

Expand Down Expand Up @@ -252,8 +253,8 @@ const (

deleteSubscriptions = `
UPDATE subscriptions SET
deleted_at = NOW()
WHERE id = $1 AND project_id = $2;
deleted_at = $1
WHERE id = $2 AND project_id = $3;
`
)

Expand Down Expand Up @@ -320,13 +321,13 @@ func (s *subscriptionRepo) LoadAllSubscriptionConfig(ctx context.Context, projec
func() {
defer closeWithError(rows)
for rows.Next() {
sub := datastore.Subscription{}
sub := dbSubscription{}
if err = rows.StructScan(&sub); err != nil {
return
}

nullifyEmptyConfig(&sub)
subs[counter] = sub
subs[counter] = *sub.toDatastoreSubscription()
counter++
}

Expand Down Expand Up @@ -411,13 +412,13 @@ func (s *subscriptionRepo) fetchChangedSubscriptionConfig(ctx context.Context, c
func() {
defer closeWithError(rows)
for rows.Next() {
sub := datastore.Subscription{}
sub := dbSubscription{}
if err = rows.StructScan(&sub); err != nil {
return
}

nullifyEmptyConfig(&sub)
subs[counter] = sub
subs[counter] = *sub.toDatastoreSubscription()
counter++
}

Expand Down Expand Up @@ -490,7 +491,7 @@ func (s *subscriptionRepo) CreateSubscription(ctx context.Context, projectID str
return ErrSubscriptionNotCreated
}

_subscription := &datastore.Subscription{}
_subscription := &dbSubscription{}
err = s.db.QueryRowxContext(ctx, fmt.Sprintf(fetchSubscriptionByID, "s.id", "s.project_id"), subscription.UID, projectID).StructScan(_subscription)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
Expand All @@ -500,7 +501,7 @@ func (s *subscriptionRepo) CreateSubscription(ctx context.Context, projectID str
}

nullifyEmptyConfig(_subscription)
*subscription = *_subscription
*subscription = *_subscription.toDatastoreSubscription()

return nil
}
Expand Down Expand Up @@ -529,11 +530,10 @@ func (s *subscriptionRepo) UpdateSubscription(ctx context.Context, projectID str
fc.Filter.IsFlattened = true // this is just a flag so we can identify old records

result, err := s.db.ExecContext(
ctx, updateSubscription, subscription.UID, projectID,
subscription.Name, subscription.EndpointID, sourceID,
ac.Count, ac.Threshold, rc.Type, rc.Duration, rc.RetryCount,
fc.EventTypes, fc.Filter.Headers, fc.Filter.Body, fc.Filter.IsFlattened,
rlc.Count, rlc.Duration, subscription.Function,
ctx, updateSubscription, subscription.Name, subscription.EndpointID, sourceID,
ac.Count, ac.Threshold, rc.Type, rc.Duration, rc.RetryCount, fc.EventTypes,
fc.Filter.Headers, fc.Filter.Body, fc.Filter.IsFlattened, rlc.Count,
rlc.Duration, subscription.Function, time.Now(), subscription.UID, projectID,
)
if err != nil {
return err
Expand All @@ -548,7 +548,7 @@ func (s *subscriptionRepo) UpdateSubscription(ctx context.Context, projectID str
return ErrSubscriptionNotUpdated
}

_subscription := &datastore.Subscription{}
_subscription := &dbSubscription{}
err = s.db.QueryRowxContext(ctx, fmt.Sprintf(fetchSubscriptionByID, "s.id", "s.project_id"), subscription.UID, projectID).StructScan(_subscription)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
Expand All @@ -558,7 +558,7 @@ func (s *subscriptionRepo) UpdateSubscription(ctx context.Context, projectID str
}

nullifyEmptyConfig(_subscription)
*subscription = *_subscription
*subscription = *_subscription.toDatastoreSubscription()

return nil
}
Expand Down Expand Up @@ -668,7 +668,7 @@ func (s *subscriptionRepo) LoadSubscriptionsPaged(ctx context.Context, projectID
}

func (s *subscriptionRepo) DeleteSubscription(ctx context.Context, projectID string, subscription *datastore.Subscription) error {
result, err := s.db.ExecContext(ctx, deleteSubscriptions, subscription.UID, projectID)
result, err := s.db.ExecContext(ctx, deleteSubscriptions, time.Now(), subscription.UID, projectID)
if err != nil {
return err
}
Expand All @@ -686,7 +686,7 @@ func (s *subscriptionRepo) DeleteSubscription(ctx context.Context, projectID str
}

func (s *subscriptionRepo) FindSubscriptionByID(ctx context.Context, projectID string, subscriptionID string) (*datastore.Subscription, error) {
subscription := &datastore.Subscription{}
subscription := &dbSubscription{}
err := s.db.QueryRowxContext(ctx, fmt.Sprintf(fetchSubscriptionByID, "s.id", "s.project_id"), subscriptionID, projectID).StructScan(subscription)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
Expand All @@ -697,7 +697,7 @@ func (s *subscriptionRepo) FindSubscriptionByID(ctx context.Context, projectID s

nullifyEmptyConfig(subscription)

return subscription, nil
return subscription.toDatastoreSubscription(), nil
}

func (s *subscriptionRepo) FindSubscriptionsBySourceID(ctx context.Context, projectID string, sourceID string) ([]datastore.Subscription, error) {
Expand Down Expand Up @@ -727,7 +727,7 @@ func (s *subscriptionRepo) FindSubscriptionsByEndpointID(ctx context.Context, pr
}

func (s *subscriptionRepo) FindSubscriptionByDeviceID(ctx context.Context, projectId string, deviceID string, subscriptionType datastore.SubscriptionType) (*datastore.Subscription, error) {
subscription := &datastore.Subscription{}
subscription := &dbSubscription{}
err := s.db.QueryRowxContext(ctx, fetchSubscriptionByDeviceID, deviceID, projectId, subscriptionType).StructScan(subscription)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
Expand All @@ -739,7 +739,7 @@ func (s *subscriptionRepo) FindSubscriptionByDeviceID(ctx context.Context, proje

nullifyEmptyConfig(subscription)

return subscription, nil
return subscription.toDatastoreSubscription(), nil
}

func (s *subscriptionRepo) FindCLISubscriptions(ctx context.Context, projectID string) ([]datastore.Subscription, error) {
Expand Down Expand Up @@ -810,7 +810,7 @@ var (
emptyRateLimitConfig = datastore.RateLimitConfiguration{}
)

func nullifyEmptyConfig(sub *datastore.Subscription) {
func nullifyEmptyConfig(sub *dbSubscription) {
if sub.AlertConfig != nil && *sub.AlertConfig == emptyAlertConfig {
sub.AlertConfig = nil
}
Expand All @@ -830,15 +830,59 @@ func scanSubscriptions(rows *sqlx.Rows) ([]datastore.Subscription, error) {
defer closeWithError(rows)

for rows.Next() {
sub := datastore.Subscription{}
sub := dbSubscription{}
err = rows.StructScan(&sub)
if err != nil {
return nil, err
}
nullifyEmptyConfig(&sub)

subscriptions = append(subscriptions, sub)
subscriptions = append(subscriptions, *sub.toDatastoreSubscription())
}

return subscriptions, nil
}

type dbSubscription struct {
UID string `db:"id"`
Name string `db:"name"`
Type datastore.SubscriptionType `db:"type"`
ProjectID string `db:"project_id"`
SourceID string `db:"source_id"`
EndpointID string `db:"endpoint_id"`
DeviceID string `db:"device_id"`
Function null.String `db:"function"`
Source *datastore.Source `db:"source_metadata"`
Endpoint *datastore.Endpoint `db:"endpoint_metadata"`
Device *datastore.Device `db:"device_metadata"`
AlertConfig *datastore.AlertConfiguration `db:"alert_config"`
RetryConfig *datastore.RetryConfiguration `db:"retry_config"`
FilterConfig *datastore.FilterConfiguration `db:"filter_config"`
RateLimitConfig *datastore.RateLimitConfiguration `db:"rate_limit_config"`
CreatedAt string `db:"created_at"`
UpdatedAt string `db:"updated_at"`
DeletedAt *string `db:"deleted_at"`
}

func (ss *dbSubscription) toDatastoreSubscription() *datastore.Subscription {
return &datastore.Subscription{
UID: ss.UID,
Name: ss.Name,
Type: ss.Type,
ProjectID: ss.ProjectID,
SourceID: ss.SourceID,
EndpointID: ss.EndpointID,
DeviceID: ss.DeviceID,
Function: ss.Function,
Source: ss.Source,
Endpoint: ss.Endpoint,
Device: ss.Device,
AlertConfig: ss.AlertConfig,
RetryConfig: ss.RetryConfig,
FilterConfig: ss.FilterConfig,
RateLimitConfig: ss.RateLimitConfig,
CreatedAt: asTime(ss.CreatedAt),
UpdatedAt: asTime(ss.UpdatedAt),
DeletedAt: asNullTime(ss.DeletedAt),
}
}
Loading

0 comments on commit 0ec67a9

Please sign in to comment.