Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrated Workflows From In Memory to Db #33

Merged
merged 2 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions data/scripts/def.sql
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,15 @@ CREATE TABLE IF NOT EXISTS api_keys (
FOREIGN KEY(user_ID) REFERENCES users(id)
);

CREATE TABLE IF NOT EXISTS workflows (
id INTEGER NOT NULL PRIMARY KEY,
topic_name TEXT NOT NULL,
offset INTEGER NOT NULL,
function_name TEXT NOT NULL,
enabled BOOLEAN NOT NULL DEFAULT TRUE,
sink_url TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_modified TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);


2 changes: 1 addition & 1 deletion persistence/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func setupConnection(isDevMode bool) (*Datastore, error) {
var db_file string

if isDevMode {
db_file = ":memory:"
db_file = "file::memory:?cache=shared"
} else {
db_file = os.Getenv("DB_FILE_LOCATION")
}
Expand Down
4 changes: 2 additions & 2 deletions persistence/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,10 @@ func (dstore *Datastore) GcSessions(maxlifetime int64) error {
deleteQuery := "DELETE FROM sessions ID=?;"
dstore.RWMutex.Lock()
rows, err := dstore.db.Query(selectQuery)
dstore.RWMutex.Unlock()
bdkiran marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var sdb SessionDB
err = rows.Scan(&sdb.ID, &sdb.TimeAccessed)
Expand All @@ -233,7 +233,7 @@ func (dstore *Datastore) GcSessions(maxlifetime int64) error {
}
}
}
dstore.RWMutex.Unlock()
rows.Close()
bdkiran marked this conversation as resolved.
Show resolved Hide resolved
err = rows.Err()
if err != nil {
return err
Expand Down
3 changes: 1 addition & 2 deletions persistence/stickyConnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ func (dstore *Datastore) GetAllEndpoints(userId int) ([]Endpoint, error) {
if err != nil {
return stickey_connections, err
}
defer rows.Close()

for rows.Next() {
var stickey_connection Endpoint
err := rows.Scan(&stickey_connection.RouteID, &stickey_connection.Security_key, &stickey_connection.TopicName, &stickey_connection.LastModified)
Expand All @@ -62,6 +60,7 @@ func (dstore *Datastore) GetAllEndpoints(userId int) ([]Endpoint, error) {
}
stickey_connections = append(stickey_connections, stickey_connection)
}
rows.Close()
return stickey_connections, nil
}

Expand Down
2 changes: 1 addition & 1 deletion persistence/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ func (dstore *Datastore) getAllUsers() ([]User, error) {
if err != nil {
return users, err
}
defer rows.Close()
for rows.Next() {
var user User
err := rows.Scan(&user.ID, &user.Email, &user.AuthType, &user.CreatedAt, &user.LastModified)
Expand All @@ -41,6 +40,7 @@ func (dstore *Datastore) getAllUsers() ([]User, error) {
}
users = append(users, user)
}
rows.Close()
return users, nil
}

Expand Down
155 changes: 155 additions & 0 deletions persistence/workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package persistence

import (
"errors"
"reflect"
"time"
)

type Workflow struct {
Id int `json:"id"`
TopicName string `json:"topicName"`
Offset int `json:"offset"`
FunctionName string `json:"functionName"`
Enabled bool `json:"enabled"`
SinkURL string `json:"sinkURL"`
LastModified time.Time `json:"lastModified,omitempty"`
}

type funcMap map[string]interface{}

var FUNC_MAP = funcMap{}
bdkiran marked this conversation as resolved.
Show resolved Hide resolved

func (workflow Workflow) Call(params ...interface{}) (result interface{}, err error) {
f := reflect.ValueOf(FUNC_MAP[workflow.FunctionName])
if len(params) != f.Type().NumIn() {
err = errors.New("the number of params is out of index")
return
}
in := make([]reflect.Value, len(params))
for k, param := range params {
in[k] = reflect.ValueOf(param)
}
res := f.Call(in)
result = res[0].Interface()
return
}

func (dstore *Datastore) GetWorkflow(id int) (Workflow, error) {
selectQuery := "SELECT id, topic_name, offset, function_name, enabled, sink_url, last_modified FROM workflows WHERE id=?;"
dstore.RWMutex.RLock()
row := dstore.db.QueryRow(selectQuery, id)
dstore.RWMutex.RUnlock()

var workflow Workflow

err := row.Scan(&workflow.Id, &workflow.TopicName, &workflow.Offset, &workflow.FunctionName, &workflow.Enabled, &workflow.SinkURL, &workflow.LastModified)
if err != nil {
return workflow, err
}
return workflow, nil
}

func (dstore *Datastore) GetWorkflows() ([]Workflow, error) {
workflows := []Workflow{}
selectQuery := "SELECT id, topic_name, offset, function_name, enabled, sink_url, last_modified FROM workflows;"
dstore.RWMutex.RLock()
rows, err := dstore.db.Query(selectQuery)
if err != nil {
return workflows, err
}

defer rows.Close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to how we did in sessions and stickyConnections let's explicitly close tho rows instead of leaving it up to defer

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm changing all row-related defer statements while I'm at it

for rows.Next() {
var workflow Workflow
err := rows.Scan(&workflow.Id, &workflow.TopicName, &workflow.Offset, &workflow.FunctionName, &workflow.Enabled, &workflow.SinkURL, &workflow.LastModified)
if err != nil {
return workflows, err
}
workflows = append(workflows, workflow)
}
dstore.RWMutex.RUnlock()
err = rows.Err()
if err != nil {
return workflows, err
}
return workflows, nil
}

func (dstore *Datastore) GetEnabledWorkflows() ([]Workflow, error) {
workflows := []Workflow{}
selectQuery := "SELECT id, topic_name, offset, function_name, enabled, sink_url, last_modified FROM workflows WHERE enabled=true;"
dstore.RWMutex.RLock()
rows, err := dstore.db.Query(selectQuery)

if err != nil {
return workflows, err
}
defer rows.Close()
for rows.Next() {
var workflow Workflow
err := rows.Scan(&workflow.Id, &workflow.TopicName, &workflow.Offset, &workflow.FunctionName, &workflow.Enabled, &workflow.SinkURL, &workflow.LastModified)
if err != nil {
return workflows, err
}
workflows = append(workflows, workflow)
}
dstore.RWMutex.RUnlock()
return workflows, nil
}

func (dstore *Datastore) InsertWorkflow(workflow Workflow) (time.Time, error) {
insertQuery := `
INSERT INTO workflows(id, topic_name, offset, function_name, sink_url, last_modified)
VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
RETURNING id, last_modified;
`

dstore.RWMutex.Lock()
row := dstore.db.QueryRow(insertQuery, workflow.Id, workflow.TopicName, workflow.Offset, workflow.FunctionName, workflow.SinkURL)
dstore.RWMutex.Unlock()

var insertedID int
var lastModified time.Time

err := row.Scan(&insertedID, &lastModified)
if err != nil {
return time.Time{}, err
}

return lastModified, nil
}

func (dstore *Datastore) DeleteWorkflow(id int) (int, error) {
deleteQuery := "DELETE FROM workflows WHERE id=?"
dstore.RWMutex.Lock()
res, err := dstore.db.Exec(deleteQuery, id)
dstore.RWMutex.Unlock()
if err != nil {
return 0, err
}
rowsDeleted, err := res.RowsAffected()
if err != nil {
return 0, err
}
return int(rowsDeleted), nil
}

func (dstore *Datastore) UpdateWorkflow(id int) (Workflow, error) {
updateQuery := `
UPDATE workflows
SET enabled = NOT enabled, last_modified = CURRENT_TIMESTAMP
WHERE id = ?
RETURNING id, topic_name, offset, function_name, enabled, sink_url, last_modified;
`

dstore.RWMutex.Lock()
row := dstore.db.QueryRow(updateQuery, id)
dstore.RWMutex.Unlock()
var workflow Workflow
err := row.Scan(&workflow.Id, &workflow.TopicName, &workflow.Offset, &workflow.FunctionName, &workflow.Enabled, &workflow.SinkURL, &workflow.LastModified)
if err != nil {
return workflow, err
}
return workflow, err
}
122 changes: 70 additions & 52 deletions services/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,54 +4,72 @@ import (
"bytes"
"encoding/json"
"errors"
"integrand/persistence"
"log"
"log/slog"
"net/http"
"reflect"
"sync"
"time"
)

var caseTypeMapping = map[string]int{
"Motor Vehicle Accident (MVA)": 4,
"Premises Liability": 15,
"Dog Bite": 14,
"Other": 2,
}

var WORKFLOWS = make([]Workflow, 0)

type Workflow struct {
Id int `json:"id"`
TopicName string `json:"topicName"`
Offset int `json:"offset"`
FunctionName string `json:"functionName"`
Enabled bool `json:"enabled"`
SinkURL string `json:"sinkURL"`
}

type funcMap map[string]interface{}

var FUNC_MAP = funcMap{}
const SLEEP_TIME int = 1
const MULTIPLYER int = 2
const MAX_BACKOFF int = 10

func init() {
// Register all of our functions
FUNC_MAP = map[string]interface{}{
persistence.FUNC_MAP = map[string]interface{}{
"ld_ld_sync": ld_ld_sync,
}
}

func (workflow Workflow) Call(params ...interface{}) (result interface{}, err error) {
f := reflect.ValueOf(FUNC_MAP[workflow.FunctionName])
if len(params) != f.Type().NumIn() {
err = errors.New("the number of params is out of index")
return
func Workflower() error {
log.Println("Workflower started")
for {
time.Sleep(100 * time.Millisecond)
currentWorkflows, _ := GetEnabledWorkflows()

var wg sync.WaitGroup
for _, workflow := range currentWorkflows {
wg.Add(1)
go processWorkflow(&wg, workflow)
}
wg.Wait()
}
in := make([]reflect.Value, len(params))
for k, param := range params {
in[k] = reflect.ValueOf(param)
}

func processWorkflow(wg *sync.WaitGroup, workflow persistence.Workflow) {
defer wg.Done()
sleep_time := SLEEP_TIME
for {
bytes, err := persistence.BROKER.ConsumeMessage(workflow.TopicName, workflow.Offset)
if err != nil {
if err.Error() == "offset out of bounds" {
// This error is returned when we're given an offset thats ahead of the commitlog
slog.Debug(err.Error())
time.Sleep(time.Duration(sleep_time) * time.Second)
continue
} else if err.Error() == "offset does not exist" {
// This error is returned when we look for an offset and it does not exist becuase it can't be avaliable in the commitlog
slog.Warn(err.Error())
time.Sleep(time.Duration(sleep_time) * time.Second)
return // Exit the function, to be re-checked in the next cycle
} else {
slog.Error(err.Error())
return // Something's wrong
}
}
workflow.Call(bytes, workflow.SinkURL)
workflow.Offset++
sleep_time = SLEEP_TIME
}
res := f.Call(in)
result = res[0].Interface()
return
}

var caseTypeMapping = map[string]int{
"Motor Vehicle Accident (MVA)": 4,
"Premises Liability": 15,
"Dog Bite": 14,
"Other": 2,
}

func ld_ld_sync(bytes []byte, sinkURL string) error {
Expand All @@ -75,24 +93,6 @@ func ld_ld_sync(bytes []byte, sinkURL string) error {
return nil
}

// Should move to utils later

func GetOrDefaultString(m map[string]interface{}, key string, defaultStr string) string {
if value, ok := m[key]; ok {
if str, ok := value.(string); ok {
return str
}
}
return defaultStr
}

func GetOrDefaultInt(m map[string]int, key string, defaultInt int) int {
if num, ok := m[key]; ok {
return num
}
return defaultInt
}

func sendLeadToClf(jsonBody map[string]interface{}, sinkURL string) error {
defaultStr := ""

Expand Down Expand Up @@ -139,3 +139,21 @@ func sendLeadToClf(jsonBody map[string]interface{}, sinkURL string) error {
log.Printf("Response Body: %v", responseBody)
return nil
}

// Should move to utils later

func GetOrDefaultString(m map[string]interface{}, key string, defaultStr string) string {
if value, ok := m[key]; ok {
if str, ok := value.(string); ok {
return str
}
}
return defaultStr
}

func GetOrDefaultInt(m map[string]int, key string, defaultInt int) int {
if num, ok := m[key]; ok {
return num
}
return defaultInt
}
Loading