Skip to content

Commit

Permalink
Fix bug about ingest pipeline calling coalesce too early.
Browse files Browse the repository at this point in the history
Bug reported as issue #63.
  • Loading branch information
juagargi committed Apr 17, 2024
1 parent 9e1bb62 commit bd16979
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 52 deletions.
6 changes: 4 additions & 2 deletions cmd/ingest/afterIngestion.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@ package main
import (
"context"
"fmt"
"time"

"github.com/netsec-ethz/fpki/pkg/db"
"github.com/netsec-ethz/fpki/pkg/mapserver/updater"
)

func coalescePayloadsForDirtyDomains(ctx context.Context, conn db.Conn) {
fmt.Println("Starting coalescing payloads for modified domains ...")
fmt.Printf("[%s] Starting coalescing payloads for modified domains ...\n",
time.Now().Format(time.StampMilli))
// Use NumDBWriters.
err := updater.CoalescePayloadsForDirtyDomains(ctx, conn)
exitIfError(err)

fmt.Println("Done coalescing.")
fmt.Printf("[%s] Done coalescing.\n", time.Now().Format(time.StampMilli))
}

func updateSMT(ctx context.Context, conn db.Conn) {
Expand Down
61 changes: 35 additions & 26 deletions cmd/ingest/certProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,11 @@ type CertificateProcessor struct {
updateCertBatch UpdateCertificateFunction // update strategy dependent method
strategy CertificateUpdateStrategy

incomingCh chan *CertificateNode // From the previous processor
incomingBatch chan *CertBatch // Ready to be inserted
doneCh chan struct{}
incomingCh chan *CertificateNode // From the previous processor
batchCh chan *CertBatch // Ready to be inserted
doneCh chan struct{}
// Statistics:
statsTicker *time.Ticker
ReadCerts atomic.Int64
ReadBytes atomic.Int64
UncachedCerts atomic.Int64
Expand Down Expand Up @@ -101,66 +102,71 @@ func NewCertProcessor(conn db.Conn, incoming chan *CertificateNode,
conn: conn,
updateCertBatch: updateFcn,
strategy: strategy,
incomingCh: incoming,
incomingBatch: make(chan *CertBatch),
doneCh: make(chan struct{}),
statsTicker: time.NewTicker(2 * time.Second),
}

p.start()
p.Resume(incoming)
return p
}

// start starts the pipeline.
// Resume starts or continues the pipeline.
// Two stages in this processor: from certificate node to batch, and from batch to DB.
func (p *CertificateProcessor) start() {
func (p *CertificateProcessor) Resume(incoming chan *CertificateNode) {
// Prepare DB for certificate update.
p.PrepareDB()

// Start pipeline.
// Prepare pipeline structure.
// Incoming receives the parsed certificates from the previous pipeline (Processor).
p.incomingCh = incoming
// batchCh receives batches of certificates from the previous channel.
p.batchCh = make(chan *CertBatch)
// doneCh indicates all the batches are updated in DB.
p.doneCh = make(chan struct{})

// Create batches. Two workers are enough.
go func() {
const numWorkers = 2
wg := sync.WaitGroup{}
wg.Add(NumDBWriters)
for w := 0; w < NumDBWriters; w++ {
wg.Add(numWorkers)
for w := 0; w < numWorkers; w++ {
go func() {
defer wg.Done()
p.createBatches()
p.createBatches() // from incomingCh -> batchCh
}()
}
wg.Wait()
// Because the stage is finished, close the output channel:
close(p.incomingBatch)
}()

// Read batches and call the DB update method. Run NumDBWriters workers.
go func() {
wg := sync.WaitGroup{}
wg.Add(NumDBWriters)
for w := 0; w < NumDBWriters; w++ {
w := w
go func() {
defer wg.Done()
for batch := range p.incomingBatch {
p.processBatch(w, batch)
for batch := range p.batchCh {
p.processBatch(w, batch) // from batchCh
}
}()
}
wg.Wait()
// Leave the DB ready again.
p.ConsolidateDB()

// Stop printing the stats.
p.statsTicker.Stop()

// This pipeline is finished, signal it.
p.doneCh <- struct{}{}
}()

// Statistics.
ticker := time.NewTicker(2 * time.Second)
startTime := time.Now()
go func() {
for {
select {
case <-ticker.C:
case <-p.doneCh:
p.doneCh <- struct{}{} // signal again
return
}
<-p.statsTicker.C

writtenCerts := p.ReadCerts.Load()
writtenBytes := p.ReadBytes.Load()
uncachedCerts := p.UncachedCerts.Load()
Expand Down Expand Up @@ -232,12 +238,15 @@ func (p *CertificateProcessor) createBatches() {
for c := range p.incomingCh {
batch.AddCertificate(c)
if batch.IsFull() {
p.incomingBatch <- batch
p.batchCh <- batch
batch = NewCertificateBatch()
}
}
// Last batch (might be empty).
p.incomingBatch <- batch
p.batchCh <- batch

// Because the stage is finished, close the output channel:
close(p.batchCh)
}

func (p *CertificateProcessor) processBatch(workerID int, batch *CertBatch) {
Expand Down
2 changes: 1 addition & 1 deletion cmd/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func mainFunction() int {
proc.BundleMaxSize = *bundleSize
proc.OnBundleFinished = func() {
// Called for intermediate bundles. Need to coalesce, update SMT and clean dirty.
fmt.Println("Bundle ingestion finished.")
fmt.Println("Another bundle ingestion finished.")
coalescePayloadsForDirtyDomains(ctx, conn)
updateSMT(ctx, conn)
cleanupDirty(ctx, conn)
Expand Down
61 changes: 38 additions & 23 deletions cmd/ingest/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ type Processor struct {

incomingFileCh chan util.CsvFile // New files with certificates to be ingested
certWithChainChan chan *CertWithChainData // After parsing files
nodeChan chan *CertificateNode // After finding parents, to be sent to DB and SMT
batchProcessor *CertificateProcessor // Processes certificate nodes (with parent pointer)
parsedCertCh chan *CertificateNode // After finding parents, to be sent to DB and SMT
certProcessor *CertificateProcessor // Processes certificate nodes (with parent pointer)

BundleMaxSize uint64 // Max # of certs before calling OnBundle
OnBundleFinished func()
Expand All @@ -48,15 +48,15 @@ type CertWithChainData struct {
}

func NewProcessor(conn db.Conn, certUpdateStrategy CertificateUpdateStrategy) *Processor {
nodeChan := make(chan *CertificateNode)
parsedCertCh := make(chan *CertificateNode)
p := &Processor{
Conn: conn,
cache: cache.NewNoCache(),
now: time.Now(),
incomingFileCh: make(chan util.CsvFile),
certWithChainChan: make(chan *CertWithChainData),
nodeChan: nodeChan,
batchProcessor: NewCertProcessor(conn, nodeChan, certUpdateStrategy),
parsedCertCh: parsedCertCh,
certProcessor: NewCertProcessor(conn, parsedCertCh, certUpdateStrategy),

BundleMaxSize: math.MaxUint64, // originally set to "no limit"
OnBundleFinished: func() {}, // originally set to noop
Expand Down Expand Up @@ -97,15 +97,16 @@ func (p *Processor) start() {
close(p.certWithChainChan)
}()

// Process the parsed content into the DB, and from DB into SMT:
// Process the parsed content into the DB.
// OnBundleFinished, called from callForEachBundle, would coalesce certs and update the SMT.
go func() {
var flyingCertCount uint64
for data := range p.certWithChainChan {
certs, certIDs, parentIDs, names := util.UnfoldCert(data.Cert, data.CertID,
data.ChainPayloads, data.ChainIDs)
for i := range certs {
// Add the certificate.
p.nodeChan <- &CertificateNode{
p.parsedCertCh <- &CertificateNode{
CertID: certIDs[i],
Cert: certs[i],
ParentID: parentIDs[i],
Expand All @@ -117,20 +118,18 @@ func (p *Processor) start() {
// "flying" certificates (not coalesced and whose SMT is not updated).
// If so, we need to call the OnBundleFinished callback.
if flyingCertCount == p.BundleMaxSize {
p.OnBundleFinished()
p.callForEachBundle(true)
flyingCertCount = 0
}
}
}

resume := false
if flyingCertCount > 0 {
p.OnBundleFinished()
flyingCertCount = 0
resume = true
}
// This stage has finished, close the output channel:
close(p.nodeChan)

// Wait for the next stage to finish
p.batchProcessor.Wait()
p.callForEachBundle(resume)

// There is no more processing to do, close the errors channel and allow the
// error processor to finish.
Expand All @@ -143,6 +142,22 @@ func (p *Processor) start() {
}()
}

// callForEachBundle waits until all certificates have been updated in the DB.
// If the parameter resume is true, it will prepare the certificate processor for more bundles.
func (p *Processor) callForEachBundle(resume bool) {
// Signal the cert processor that we don't send more certificates.
close(p.parsedCertCh)
// Wait for the cert processor to finish.
p.certProcessor.Wait()
// Actual call per bundle.
p.OnBundleFinished()
if resume {
p.parsedCertCh = make(chan *CertificateNode)
p.certProcessor.Resume(p.parsedCertCh)
// p.certProcessor = NewCertProcessor(p.Conn, p.parsedCertCh, p.certProcessor.strategy)
}
}

func (p *Processor) Wait() error {
// Close the parsing and incoming channels:
close(p.incomingFileCh)
Expand All @@ -155,7 +170,7 @@ func (p *Processor) Wait() error {
// It blocks until it is accepted.
func (p *Processor) AddGzFiles(fileNames []string) {
// Tell the certificate processor that we have these new files.
p.batchProcessor.TotalFiles.Add(int64(len(fileNames)))
p.certProcessor.TotalFiles.Add(int64(len(fileNames)))
// Parse the file and send it to the CSV parser.
for _, filename := range fileNames {
p.incomingFileCh <- (&util.GzFile{}).WithFile(filename)
Expand All @@ -166,7 +181,7 @@ func (p *Processor) AddGzFiles(fileNames []string) {
// It blocks until it is accepted.
func (p *Processor) AddCsvFiles(fileNames []string) {
// Tell the certificate processor that we have these new files.
p.batchProcessor.TotalFiles.Add(int64(len(fileNames)))
p.certProcessor.TotalFiles.Add(int64(len(fileNames)))
// Parse the file and send it to the CSV parser.
for _, filename := range fileNames {
p.incomingFileCh <- (&util.UncompressedFile{}).WithFile(filename)
Expand Down Expand Up @@ -223,9 +238,9 @@ func (p *Processor) ingestWithCSV(fileReader io.Reader) error {
}

// Update statistics.
p.batchProcessor.ReadBytes.Add(int64(len(rawBytes)))
p.batchProcessor.ReadCerts.Inc()
p.batchProcessor.UncachedCerts.Inc()
p.certProcessor.ReadBytes.Add(int64(len(rawBytes)))
p.certProcessor.ReadCerts.Inc()
p.certProcessor.UncachedCerts.Inc()

if p.now.After(cert.NotAfter) {
return nil
Expand All @@ -241,8 +256,8 @@ func (p *Processor) ingestWithCSV(fileReader io.Reader) error {
return fmt.Errorf("at line %d: %s\n%s", lineNo, err, fields[CertChainColumn])
}
// Update statistics.
p.batchProcessor.ReadBytes.Add(int64(len(rawBytes)))
p.batchProcessor.ReadCerts.Inc()
p.certProcessor.ReadBytes.Add(int64(len(rawBytes)))
p.certProcessor.ReadCerts.Inc()
// Check if the parent certificate is in the cache.
id := common.SHA256Hash32Bytes(rawBytes)
if !p.cache.Contains(&id) {
Expand All @@ -252,7 +267,7 @@ func (p *Processor) ingestWithCSV(fileReader io.Reader) error {
return fmt.Errorf("at line %d: %s\n%s", lineNo, err, fields[CertChainColumn])
}
p.cache.AddIDs([]*common.SHA256Output{&id})
p.batchProcessor.UncachedCerts.Inc()
p.certProcessor.UncachedCerts.Inc()
}
chainIDs[i] = &id
}
Expand Down Expand Up @@ -298,7 +313,7 @@ func (p *Processor) ingestWithCSV(fileReader io.Reader) error {
}
close(recordsChan)
wg.Wait()
p.batchProcessor.TotalFilesRead.Add(1) // one more file had been read
p.certProcessor.TotalFilesRead.Add(1) // one more file had been read

return nil
}
Expand Down

0 comments on commit bd16979

Please sign in to comment.