-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
145 lines (123 loc) · 4.75 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package main
import (
"context"
"os"
"time"
joonix "github.com/joonix/log"
log "github.com/sirupsen/logrus"
)
// Global Variables
// App version / build info
// go build -ldflags "-X main.appBuildTime=`date -u '+%Y-%m-%d_%I:%M:%S%p'` -X main.appGitHash=`git rev-parse HEAD`"
var appGitHash, appBuildTime string = "", ""
// global vars for metrics usage, mutex required for FirebaseMetrics
var imetrics IngestedMetrics
var fmmetrics FirebaseMetrics
var maxHugeDifferentialSetting float64
// clapi and navajo auth config objects
var authConf CLAPIOauthConfig
var navajoAuthConf NavajoAuthConfig
// global var including CL API Ids -> Cartwheel Ids mapping
var navajoReferenceIds NavajoAccountData
// global channels
var navajoUpdater chan bool
var shutdownFirestreamImmediately chan bool
var firestoreAssembly chan ReportDataStreamV1
var transponderReportsV1 chan TransponderReportDataStreamV1
var videoReportsV1 chan VideoReportDataStreamV1
var eldReportsV1 chan EldReportDataStreamV1
// global tuner knobs
var maxJSONParseErrors float64
var navajoRebuildTimer time.Duration // triggers navajo id mapping
var websocketTimeout time.Duration // triggers websocket reset if no data within duration
// GCP project config
var gcpProjectId string
func init() {
// Disable log prefixes such as the default timestamp.
// Prefix text prevents the message from being parsed as JSON.
// A timestamp is added when shipping logs to Cloud Logging.
//log.SetFlags(0) // default golang log pkg - turn off timestamps
//log.SetFormatter(&log.JSONFormatter{}) // default logrus json formatter
log.SetFormatter(joonix.NewFormatter()) // default stackdriver log formatter
// initialize our global channels
initGlobalChannels()
}
func main() {
// get configs from environment
err := parseEnvConfigs()
if err != nil {
log.Errorln(err)
os.Exit(1)
}
// Base context object is empty, we'll immediately create a copy
// of our parent context with a Done channel. This new Done channel
// is closed when the cancel() func returned is called, and it will
// also make all copies of this context with their own Done channel
// return immediately.
ctx, cancel := context.WithCancel(context.Background())
// catch sig term and ctrl+c etc, tell our goroutines to return that are spun off ctx
go setupCloseHandler(cancel)
// init our firestore pipeline workers
// init global metrics objects ..
imetrics.transpondersWithNoAccountId = make(map[float64]bool)
// init global objects who are periodically updated to contain CL API Ids -> Cartwheel Ids
navajoReferenceIds.clAccountIdMap = make(map[string]string)
navajoReferenceIds.clDeviceIdMap = make(map[string]string)
// init go routine to perodically poll navajo and build account/transponder id maps
go keepNavajoIdMapsUpdated(ctx)
navajoUpdater <- true // request navajo map rebuild immediately
// make sure our ref maps are available before opening stream..
// we can fail on navajo updates occassionally later without issues
// but failing at boot means we have no device or account ids to match
// incoming streaming data with
for len(navajoReferenceIds.clAccountIdMap) == 0 || len(navajoReferenceIds.clDeviceIdMap) == 0 {
if navajoReferenceIds.failedUpdates > 0 {
log.Errorln("FATAL: Cannot init connection to Navajo API at startup! Bailing out!")
shutdownFirestreamImmediately <- true // tell app to log basic metrics before cancelling main context
break
}
log.Debug("Waiting on Navajo ID map population..")
time.Sleep(2 * time.Second)
}
// launch assembly pipeline router
go firestoreAssemblyRouter(ctx)
// launch event report_data assembler workers
for i := 0; i < 5; i++ {
c, err := createFirestoreClient(ctx)
if err != nil {
log.Errorln("ERROR FATAL: Unable to create firestore Report Data clients at Firestream init!")
shutdownFirestreamImmediately <- true
}
go transponderReportWriterV1(ctx, c)
}
/*
// launch video data assemblers
for i := 0; i < 2; i++ {
c, err := createFirestoreClient(ctx)
if err != nil {
log.Errorln("ERROR FATAL: Unable to create firestore Video Data clients at Firestream init!")
shutdownFirestreamImmediately <- true
}
go videoReportWriterV1(ctx, c)
}*/
// launch eld data assemblers
for i := 0; i < 2; i++ {
c, err := createFirestoreClient(ctx)
if err != nil {
log.Errorln("ERROR FATAL: Unable to create firestore Video Data clients at Firestream init!")
shutdownFirestreamImmediately <- true
}
go eldReportWriterV1(ctx, c)
}
// launch websocket ingestion goroutine
go websocketIngestor(ctx)
log.Infof("Firestream %s:%s is running...", appBuildTime, appGitHash)
for {
select {
case <-ctx.Done():
log.Debugln("main(): context.Done() received")
time.Sleep(200 * time.Millisecond) // allow any final metrics tasks to finish
log.Exit(0)
}
}
}