Skip to content

Commit

Permalink
database: postgres implementation with tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
KeyboardNerd committed Jul 26, 2017
1 parent 0065dab commit 3e3b9c8
Show file tree
Hide file tree
Showing 30 changed files with 2,987 additions and 2,979 deletions.
271 changes: 271 additions & 0 deletions database/pgsql/ancestry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
package pgsql

import (
"database/sql"
"errors"
"fmt"

"strings"

"github.com/coreos/clair/database"
"github.com/coreos/clair/pkg/commonerr"
"github.com/lib/pq"
log "github.com/sirupsen/logrus"
)

func (tx *pgSession) UpsertAncestry(ancestry database.Ancestry, features []database.NamespacedFeature, processedBy database.Processors) error {
if ancestry.Name == "" {
log.Warning("Empty ancestry name is not allowed")
return commonerr.NewBadRequestError("could not insert an ancestry with empty name")
}

if len(ancestry.Layers) == 0 {
log.Warning("Empty ancestry is not allowed")
return commonerr.NewBadRequestError("could not insert an ancestry with 0 layers")
}

err := tx.deleteAncestry(ancestry.Name)
if err != nil {
return err
}

var ancestryID int
err = tx.QueryRow(insertAncestry, ancestry.Name).Scan(&ancestryID)
if err != nil {
return handleError("insertAncestry", err)
}

err = tx.insertAncestryLayers(ancestryID, ancestry.Layers)
if err != nil {
return err
}

err = tx.insertAncestryFeatures(ancestryID, features)
if err != nil {
return err
}

return tx.persistProcessors(persistAncestryLister,
"persistAncestryLister",
persistAncestryDetector,
"persistAncestryDetector",
ancestryID, processedBy)
}

func (tx *pgSession) FindAncestry(name string) (database.Ancestry, database.Processors, bool, error) {
ancestry := database.Ancestry{Name: name}
processed := database.Processors{}

var ancestryID int
err := tx.QueryRow(searchAncestry, name).Scan(&ancestryID)
if err != nil {
if err == sql.ErrNoRows {
return ancestry, processed, false, nil
}
return ancestry, processed, false, handleError("searchAncestry", err)
}

ancestry.Layers, err = tx.findAncestryLayers(ancestryID)
if err != nil {
return ancestry, processed, false, err
}

processed.Detectors, err = tx.findProcessors(searchAncestryDetectors, "searchAncestryDetectors", "detector", ancestryID)
if err != nil {
return ancestry, processed, false, err
}

processed.Listers, err = tx.findProcessors(searchAncestryListers, "searchAncestryListers", "lister", ancestryID)
if err != nil {
return ancestry, processed, false, err
}

return ancestry, processed, true, nil
}

func (tx *pgSession) FindAncestryFeatures(name string) (database.AncestryWithFeatures, bool, error) {
var (
awf database.AncestryWithFeatures
ok bool
err error
)
awf.Ancestry, awf.ProcessedBy, ok, err = tx.FindAncestry(name)
if err != nil {
return awf, false, err
}

if !ok {
return awf, false, nil
}

rows, err := tx.Query(searchAncestryFeatures, name)
if err != nil {
return awf, false, handleError("searchAncestryFeatures", err)
}

for rows.Next() {
nf := database.NamespacedFeature{}
err := rows.Scan(&nf.Namespace.Name, &nf.Namespace.VersionFormat, &nf.Feature.Name, &nf.Feature.Version)
if err != nil {
return awf, false, handleError("searchAncestryFeatures", err)
}
nf.Feature.VersionFormat = nf.Namespace.VersionFormat
awf.Features = append(awf.Features, nf)
}

return awf, true, nil
}

func (tx *pgSession) deleteAncestry(name string) error {
result, err := tx.Exec(removeAncestry, name)
if err != nil {
return handleError("removeAncestry", err)
}

_, err = result.RowsAffected()
if err != nil {
return handleError("removeAncestry", err)
}

return nil
}

func (tx *pgSession) findProcessors(query, queryName, processorType string, id int) ([]string, error) {
rows, err := tx.Query(query, id)
if err != nil {
if err == sql.ErrNoRows {
log.Warning("No " + processorType + " are used")
return nil, nil
}
return nil, handleError(queryName, err)
}

var (
processors []string
processor string
)

for rows.Next() {
err := rows.Scan(&processor)
if err != nil {
return nil, handleError(queryName, err)
}
processors = append(processors, processor)
}

return processors, nil
}

func (tx *pgSession) findAncestryLayers(ancestryID int) ([]database.Layer, error) {
rows, err := tx.Query(searchAncestryLayer, ancestryID)
if err != nil {
return nil, handleError("searchAncestryLayer", err)
}
layers := []database.Layer{}
for rows.Next() {
var layer database.Layer
err := rows.Scan(&layer.Hash)
if err != nil {
return nil, handleError("searchAncestryLayer", err)
}
layers = append(layers, layer)
}
return layers, nil
}

func (tx *pgSession) insertAncestryLayers(ancestryID int, layers []database.Layer) error {
layerIDs := map[string]sql.NullInt64{}
for _, l := range layers {
layerIDs[l.Hash] = sql.NullInt64{}
}

layerHashes := []string{}
for hash := range layerIDs {
layerHashes = append(layerHashes, hash)
}

rows, err := tx.Query(searchLayerIDs, pq.Array(layerHashes))
if err != nil {
return handleError("searchLayerIDs", err)
}

for rows.Next() {
var (
layerID sql.NullInt64
layerName string
)
err := rows.Scan(&layerID, &layerName)
if err != nil {
return handleError("searchLayerIDs", err)
}
layerIDs[layerName] = layerID
}

notFound := []string{}
for hash, id := range layerIDs {
if !id.Valid {
notFound = append(notFound, hash)
}
}

if len(notFound) > 0 {
return handleError("searchLayerIDs", fmt.Errorf("Layer %s is not found in database", strings.Join(notFound, ",")))
}

stmt, err := tx.Prepare(copyinAncestryLayer)
if err != nil {
return handleError("copyinAncestryLayer", err)
}

for index, layer := range layers {
_, err := stmt.Exec(ancestryID, index, layerIDs[layer.Hash].Int64)
if err != nil {
return handleError("copyinAncestryLayer", commonerr.CombineErrors(err, stmt.Close()))
}
}

if _, err := stmt.Exec(); err != nil {
return handleError("copyinAncestryLayer", commonerr.CombineErrors(err, stmt.Close()))
}

if err := stmt.Close(); err != nil {
return handleError("copyinAncestryLayer", err)
}
return nil
}

func (tx *pgSession) insertAncestryFeatures(ancestryID int, features []database.NamespacedFeature) error {
featureIDs, err := tx.findNamespacedFeatureIDs(features)
if err != nil {
return err
}

// bulk insert ancestry features
stmtFeatures, err := tx.Prepare(copyinAncestryFeatures)
if err != nil {
return handleError("copyinAncestryFeatures", err)
}

for _, id := range featureIDs {
if !id.Valid {
stmtFeatures.Close()
return errors.New("requested namespaced feature is not in database")
}

_, err := stmtFeatures.Exec(ancestryID, id)
if err != nil {
stmtFeatures.Close()
return handleError("copyinAncestryFeatures", err)
}
}

if _, err := stmtFeatures.Exec(); err != nil {
stmtFeatures.Close()
return handleError("copyinAncestryFeatures", err)
}

if err := stmtFeatures.Close(); err != nil {
return handleError("copyinAncestryFeatures", err)
}
return nil
}
Loading

0 comments on commit 3e3b9c8

Please sign in to comment.