diff --git a/data/scripts/def.sql b/data/scripts/def.sql index 6129c0e..6cb54c3 100644 --- a/data/scripts/def.sql +++ b/data/scripts/def.sql @@ -33,3 +33,15 @@ CREATE TABLE IF NOT EXISTS api_keys ( FOREIGN KEY(user_ID) REFERENCES users(id) ); +CREATE TABLE IF NOT EXISTS workflows ( + id INTEGER NOT NULL PRIMARY KEY, + topic_name TEXT NOT NULL, + offset INTEGER NOT NULL, + function_name TEXT NOT NULL, + enabled BOOLEAN NOT NULL DEFAULT TRUE, + sink_url TEXT NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + last_modified TIMESTAMP DEFAULT CURRENT_TIMESTAMP +); + + diff --git a/persistence/apiKeys.go b/persistence/apiKeys.go index f3681d0..e7ee65d 100644 --- a/persistence/apiKeys.go +++ b/persistence/apiKeys.go @@ -66,16 +66,16 @@ func (dstore *Datastore) GetAPIKeysByUserID(userID int) ([]ApiKey, error) { if err != nil { return nil, err } - defer rows.Close() - var apiKeys []ApiKey for rows.Next() { var apiKey ApiKey err := rows.Scan(&apiKey.Id, &apiKey.Key, &apiKey.CreatedAt) if err != nil { + rows.Close() return nil, err } apiKeys = append(apiKeys, apiKey) } + rows.Close() return apiKeys, nil } diff --git a/persistence/database.go b/persistence/database.go index 90e13b5..1810e03 100644 --- a/persistence/database.go +++ b/persistence/database.go @@ -54,7 +54,7 @@ func setupConnection(isDevMode bool) (*Datastore, error) { var db_file string if isDevMode { - db_file = ":memory:" + db_file = "file::memory:?cache=shared" } else { db_file = os.Getenv("DB_FILE_LOCATION") } diff --git a/persistence/session.go b/persistence/session.go index 1b0607d..7367560 100644 --- a/persistence/session.go +++ b/persistence/session.go @@ -216,14 +216,15 @@ func (dstore *Datastore) GcSessions(maxlifetime int64) error { deleteQuery := "DELETE FROM sessions ID=?;" dstore.RWMutex.Lock() rows, err := dstore.db.Query(selectQuery) + dstore.RWMutex.Unlock() if err != nil { return err } - defer rows.Close() for rows.Next() { var sdb SessionDB err = rows.Scan(&sdb.ID, &sdb.TimeAccessed) if err != nil { + rows.Close() return err } if (sdb.TimeAccessed.Unix() + maxlifetime) < time.Now().Unix() { @@ -233,7 +234,7 @@ func (dstore *Datastore) GcSessions(maxlifetime int64) error { } } } - dstore.RWMutex.Unlock() + rows.Close() err = rows.Err() if err != nil { return err diff --git a/persistence/stickyConnection.go b/persistence/stickyConnection.go index 5aad212..f404a6e 100644 --- a/persistence/stickyConnection.go +++ b/persistence/stickyConnection.go @@ -52,16 +52,16 @@ func (dstore *Datastore) GetAllEndpoints(userId int) ([]Endpoint, error) { if err != nil { return stickey_connections, err } - defer rows.Close() - for rows.Next() { var stickey_connection Endpoint err := rows.Scan(&stickey_connection.RouteID, &stickey_connection.Security_key, &stickey_connection.TopicName, &stickey_connection.LastModified) if err != nil { + rows.Close() return stickey_connections, err } stickey_connections = append(stickey_connections, stickey_connection) } + rows.Close() return stickey_connections, nil } diff --git a/persistence/user.go b/persistence/user.go index e55a720..3e20bf7 100644 --- a/persistence/user.go +++ b/persistence/user.go @@ -32,15 +32,16 @@ func (dstore *Datastore) getAllUsers() ([]User, error) { if err != nil { return users, err } - defer rows.Close() for rows.Next() { var user User err := rows.Scan(&user.ID, &user.Email, &user.AuthType, &user.CreatedAt, &user.LastModified) if err != nil { + rows.Close() return users, err } users = append(users, user) } + rows.Close() return users, nil } diff --git a/persistence/workflow.go b/persistence/workflow.go new file mode 100644 index 0000000..9ac4381 --- /dev/null +++ b/persistence/workflow.go @@ -0,0 +1,155 @@ +package persistence + +import ( + "errors" + "reflect" + "time" +) + +type Workflow struct { + Id int `json:"id"` + TopicName string `json:"topicName"` + Offset int `json:"offset"` + FunctionName string `json:"functionName"` + Enabled bool `json:"enabled"` + SinkURL string `json:"sinkURL"` + LastModified time.Time `json:"lastModified,omitempty"` +} + +type funcMap map[string]interface{} + +var FUNC_MAP = funcMap{} + +func (workflow Workflow) Call(params ...interface{}) (result interface{}, err error) { + f := reflect.ValueOf(FUNC_MAP[workflow.FunctionName]) + if len(params) != f.Type().NumIn() { + err = errors.New("the number of params is out of index") + return + } + in := make([]reflect.Value, len(params)) + for k, param := range params { + in[k] = reflect.ValueOf(param) + } + res := f.Call(in) + result = res[0].Interface() + return +} + +func (dstore *Datastore) GetWorkflow(id int) (Workflow, error) { + selectQuery := "SELECT id, topic_name, offset, function_name, enabled, sink_url, last_modified FROM workflows WHERE id=?;" + dstore.RWMutex.RLock() + row := dstore.db.QueryRow(selectQuery, id) + dstore.RWMutex.RUnlock() + + var workflow Workflow + + err := row.Scan(&workflow.Id, &workflow.TopicName, &workflow.Offset, &workflow.FunctionName, &workflow.Enabled, &workflow.SinkURL, &workflow.LastModified) + if err != nil { + return workflow, err + } + return workflow, nil +} + +func (dstore *Datastore) GetWorkflows() ([]Workflow, error) { + workflows := []Workflow{} + selectQuery := "SELECT id, topic_name, offset, function_name, enabled, sink_url, last_modified FROM workflows;" + dstore.RWMutex.RLock() + rows, err := dstore.db.Query(selectQuery) + dstore.RWMutex.RUnlock() + if err != nil { + return workflows, err + } + for rows.Next() { + var workflow Workflow + err := rows.Scan(&workflow.Id, &workflow.TopicName, &workflow.Offset, &workflow.FunctionName, &workflow.Enabled, &workflow.SinkURL, &workflow.LastModified) + if err != nil { + rows.Close() + return workflows, err + } + workflows = append(workflows, workflow) + } + rows.Close() + err = rows.Err() + if err != nil { + return workflows, err + } + return workflows, nil +} + +func (dstore *Datastore) GetEnabledWorkflows() ([]Workflow, error) { + workflows := []Workflow{} + selectQuery := "SELECT id, topic_name, offset, function_name, enabled, sink_url, last_modified FROM workflows WHERE enabled=true;" + dstore.RWMutex.RLock() + rows, err := dstore.db.Query(selectQuery) + dstore.RWMutex.RUnlock() + if err != nil { + return workflows, err + } + for rows.Next() { + var workflow Workflow + err := rows.Scan(&workflow.Id, &workflow.TopicName, &workflow.Offset, &workflow.FunctionName, &workflow.Enabled, &workflow.SinkURL, &workflow.LastModified) + if err != nil { + rows.Close() + return workflows, err + } + workflows = append(workflows, workflow) + } + rows.Close() + return workflows, nil +} + +func (dstore *Datastore) InsertWorkflow(workflow Workflow) (time.Time, error) { + insertQuery := ` + INSERT INTO workflows(id, topic_name, offset, function_name, sink_url, last_modified) + VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP) + RETURNING id, last_modified; + ` + + dstore.RWMutex.Lock() + row := dstore.db.QueryRow(insertQuery, workflow.Id, workflow.TopicName, workflow.Offset, workflow.FunctionName, workflow.SinkURL) + dstore.RWMutex.Unlock() + + var insertedID int + var lastModified time.Time + + err := row.Scan(&insertedID, &lastModified) + if err != nil { + return time.Time{}, err + } + + return lastModified, nil +} + +func (dstore *Datastore) DeleteWorkflow(id int) (int, error) { + deleteQuery := "DELETE FROM workflows WHERE id=?" + dstore.RWMutex.Lock() + res, err := dstore.db.Exec(deleteQuery, id) + dstore.RWMutex.Unlock() + if err != nil { + return 0, err + } + rowsDeleted, err := res.RowsAffected() + if err != nil { + return 0, err + } + return int(rowsDeleted), nil +} + +func (dstore *Datastore) UpdateWorkflow(id int) (Workflow, error) { + updateQuery := ` + UPDATE workflows + SET enabled = NOT enabled, last_modified = CURRENT_TIMESTAMP + WHERE id = ? + RETURNING id, topic_name, offset, function_name, enabled, sink_url, last_modified; + ` + + dstore.RWMutex.Lock() + row := dstore.db.QueryRow(updateQuery, id) + dstore.RWMutex.Unlock() + var workflow Workflow + err := row.Scan(&workflow.Id, &workflow.TopicName, &workflow.Offset, &workflow.FunctionName, &workflow.Enabled, &workflow.SinkURL, &workflow.LastModified) + if err != nil { + return workflow, err + } + return workflow, err +} diff --git a/services/workflow.go b/services/workflow.go index cda3a40..215d503 100644 --- a/services/workflow.go +++ b/services/workflow.go @@ -4,54 +4,72 @@ import ( "bytes" "encoding/json" "errors" + "integrand/persistence" "log" "log/slog" "net/http" - "reflect" + "sync" + "time" ) -var caseTypeMapping = map[string]int{ - "Motor Vehicle Accident (MVA)": 4, - "Premises Liability": 15, - "Dog Bite": 14, - "Other": 2, -} - -var WORKFLOWS = make([]Workflow, 0) - -type Workflow struct { - Id int `json:"id"` - TopicName string `json:"topicName"` - Offset int `json:"offset"` - FunctionName string `json:"functionName"` - Enabled bool `json:"enabled"` - SinkURL string `json:"sinkURL"` -} - -type funcMap map[string]interface{} - -var FUNC_MAP = funcMap{} +const SLEEP_TIME int = 1 +const MULTIPLYER int = 2 +const MAX_BACKOFF int = 10 func init() { // Register all of our functions - FUNC_MAP = map[string]interface{}{ + persistence.FUNC_MAP = map[string]interface{}{ "ld_ld_sync": ld_ld_sync, } } -func (workflow Workflow) Call(params ...interface{}) (result interface{}, err error) { - f := reflect.ValueOf(FUNC_MAP[workflow.FunctionName]) - if len(params) != f.Type().NumIn() { - err = errors.New("the number of params is out of index") - return +func Workflower() error { + log.Println("Workflower started") + for { + time.Sleep(100 * time.Millisecond) + currentWorkflows, _ := GetEnabledWorkflows() + + var wg sync.WaitGroup + for _, workflow := range currentWorkflows { + wg.Add(1) + go processWorkflow(&wg, workflow) + } + wg.Wait() } - in := make([]reflect.Value, len(params)) - for k, param := range params { - in[k] = reflect.ValueOf(param) +} + +func processWorkflow(wg *sync.WaitGroup, workflow persistence.Workflow) { + defer wg.Done() + sleep_time := SLEEP_TIME + for { + bytes, err := persistence.BROKER.ConsumeMessage(workflow.TopicName, workflow.Offset) + if err != nil { + if err.Error() == "offset out of bounds" { + // This error is returned when we're given an offset thats ahead of the commitlog + slog.Debug(err.Error()) + time.Sleep(time.Duration(sleep_time) * time.Second) + continue + } else if err.Error() == "offset does not exist" { + // This error is returned when we look for an offset and it does not exist becuase it can't be avaliable in the commitlog + slog.Warn(err.Error()) + time.Sleep(time.Duration(sleep_time) * time.Second) + return // Exit the function, to be re-checked in the next cycle + } else { + slog.Error(err.Error()) + return // Something's wrong + } + } + workflow.Call(bytes, workflow.SinkURL) + workflow.Offset++ + sleep_time = SLEEP_TIME } - res := f.Call(in) - result = res[0].Interface() - return +} + +var caseTypeMapping = map[string]int{ + "Motor Vehicle Accident (MVA)": 4, + "Premises Liability": 15, + "Dog Bite": 14, + "Other": 2, } func ld_ld_sync(bytes []byte, sinkURL string) error { @@ -75,24 +93,6 @@ func ld_ld_sync(bytes []byte, sinkURL string) error { return nil } -// Should move to utils later - -func GetOrDefaultString(m map[string]interface{}, key string, defaultStr string) string { - if value, ok := m[key]; ok { - if str, ok := value.(string); ok { - return str - } - } - return defaultStr -} - -func GetOrDefaultInt(m map[string]int, key string, defaultInt int) int { - if num, ok := m[key]; ok { - return num - } - return defaultInt -} - func sendLeadToClf(jsonBody map[string]interface{}, sinkURL string) error { defaultStr := "" @@ -139,3 +139,21 @@ func sendLeadToClf(jsonBody map[string]interface{}, sinkURL string) error { log.Printf("Response Body: %v", responseBody) return nil } + +// Should move to utils later + +func GetOrDefaultString(m map[string]interface{}, key string, defaultStr string) string { + if value, ok := m[key]; ok { + if str, ok := value.(string); ok { + return str + } + } + return defaultStr +} + +func GetOrDefaultInt(m map[string]int, key string, defaultInt int) int { + if num, ok := m[key]; ok { + return num + } + return defaultInt +} diff --git a/services/workflowService.go b/services/workflowService.go index ca848fa..b9e9926 100644 --- a/services/workflowService.go +++ b/services/workflowService.go @@ -5,120 +5,46 @@ import ( "integrand/persistence" "log/slog" "math/rand" - "sync" - "time" ) -const SLEEP_TIME int = 1 -const MULTIPLYER int = 2 -const MAX_BACKOFF int = 10 - -var ( - workflowMu sync.Mutex -) - -func Workflower() error { - for { - workflowMu.Lock() - currentWorkflows := append([]Workflow(nil), WORKFLOWS...) - workflowMu.Unlock() - - var wg sync.WaitGroup - for _, workflow := range currentWorkflows { - wg.Add(1) - go processWorkflow(&wg, workflow) - } - wg.Wait() - - } -} - -func processWorkflow(wg *sync.WaitGroup, workflow Workflow) { - defer wg.Done() - sleep_time := SLEEP_TIME - for { - if !workflow.Enabled { - return - } - - bytes, err := persistence.BROKER.ConsumeMessage(workflow.TopicName, workflow.Offset) - if err != nil { - if err.Error() == "offset out of bounds" { - // This error is returned when we're given an offset thats ahead of the commitlog - slog.Debug(err.Error()) - time.Sleep(time.Duration(sleep_time) * time.Second) - continue - } else if err.Error() == "offset does not exist" { - // This error is returned when we look for an offset and it does not exist becuase it can't be avaliable in the commitlog - slog.Warn(err.Error()) - time.Sleep(time.Duration(sleep_time) * time.Second) - return // Exit the function, to be re-checked in the next cycle - } else { - slog.Error(err.Error()) - return // Something's wrong - } - } - workflow.Call(bytes, workflow.SinkURL) - workflow.Offset++ - sleep_time = SLEEP_TIME - } +func GetWorkflows() ([]persistence.Workflow, error) { + return persistence.DATASTORE.GetWorkflows() } -func GetWorkflows() ([]Workflow, error) { - workflowMu.Lock() - defer workflowMu.Unlock() - return WORKFLOWS, nil +func GetEnabledWorkflows() ([]persistence.Workflow, error) { + return persistence.DATASTORE.GetEnabledWorkflows() } -func DeleteWorkflow(id int) error { - workflowMu.Lock() - defer workflowMu.Unlock() - for i, workflow := range WORKFLOWS { - if workflow.Id == id { - WORKFLOWS = append(WORKFLOWS[:i], WORKFLOWS[i+1:]...) - return nil - } - } - return errors.New("workflow not found") +func DeleteWorkflow(id int) (int, error) { + return persistence.DATASTORE.DeleteWorkflow(id) } -func UpdateWorkflow(id int) (*Workflow, error) { - workflowMu.Lock() - defer workflowMu.Unlock() - for i, workflow := range WORKFLOWS { - if workflow.Id == id { - WORKFLOWS[i].Enabled = !WORKFLOWS[i].Enabled - return &WORKFLOWS[i], nil - } +func UpdateWorkflow(id int) (persistence.Workflow, error) { + workflow, err := persistence.DATASTORE.UpdateWorkflow(id) + if err != nil { + slog.Error("Failed to update workflow", "error", err) + return persistence.Workflow{}, err } - return nil, errors.New("workflow not found") + return workflow, nil } - -func GetWorkflow(id int) (*Workflow, error) { - workflowMu.Lock() - defer workflowMu.Unlock() - for _, workflow := range WORKFLOWS { - if workflow.Id == id { - return &workflow, nil - } - } - return nil, errors.New("workflow not found") +func GetWorkflow(id int) (persistence.Workflow, error) { + return persistence.DATASTORE.GetWorkflow(id) } -func CreateWorkflow(topicName string, functionName string, sinkURL string) (*Workflow, error) { - _, ok := FUNC_MAP[functionName] +func CreateWorkflow(topicName string, functionName string, sinkURL string) (persistence.Workflow, error) { + _, ok := persistence.FUNC_MAP[functionName] if !ok { slog.Error("function not found") - return nil, errors.New("workflow with this functionName: " + functionName + " cannot be created") + return persistence.Workflow{}, errors.New("workflow with this functionName: " + functionName + " cannot be created") } // Get topic to use its offset for workflow creation topic, err := persistence.BROKER.GetTopic(topicName) if err != nil { slog.Error("topic with topicName " + topicName + " not found") - return nil, errors.New("workflow with this functionName: " + functionName + " cannot be created") + return persistence.Workflow{}, errors.New("workflow with this functionName: " + functionName + " cannot be created") } - newWorkflow := Workflow{ + newWorkflow := persistence.Workflow{ Id: rangeIn(0, 100), TopicName: topicName, Offset: topic.OldestOffset, @@ -127,10 +53,12 @@ func CreateWorkflow(topicName string, functionName string, sinkURL string) (*Wor SinkURL: sinkURL, } - workflowMu.Lock() - WORKFLOWS = append(WORKFLOWS, newWorkflow) - workflowMu.Unlock() - return &newWorkflow, nil + last_modified, err := persistence.DATASTORE.InsertWorkflow(newWorkflow) + if err != nil { + return newWorkflow, err + } + newWorkflow.LastModified = last_modified + return newWorkflow, nil } func rangeIn(low, hi int) int { diff --git a/web/workflowApi.go b/web/workflowApi.go index 6d24302..b5c6649 100644 --- a/web/workflowApi.go +++ b/web/workflowApi.go @@ -160,7 +160,7 @@ func (wf *workflowAPI) deleteWorkflow(w http.ResponseWriter, r *http.Request) { apiMessageResponse(w, http.StatusBadRequest, "incorrect request sent") return } - err = services.DeleteWorkflow(id) + _, err = services.DeleteWorkflow(id) if err != nil { slog.Error(err.Error()) apiMessageResponse(w, http.StatusBadRequest, "internal server error")