Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

User/ohbitton/add file options #212

Merged
merged 20 commits into from
Dec 3, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion kusto/ingest/file_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/Azure/azure-kusto-go/kusto/data/errors"
"github.com/Azure/azure-kusto-go/kusto/ingest/internal/properties"
"github.com/Azure/azure-kusto-go/kusto/ingest/source"
"github.com/cenkalti/backoff/v4"
)

Expand Down Expand Up @@ -461,6 +462,21 @@ func ClientRequestId(clientRequestId string) FileOption {
}
}

// CompressionType sets the compression type of the data.
// Use this if the file name does not expose the compression type.
// This sets DontCompress to true for compressed data.
func CompressionType(compressionType source.CompressionType) FileOption {
return option{
run: func(p *properties.All) error {
p.Source.CompressionType = compressionType
return nil
},
clientScopes: QueuedClient | StreamingClient | ManagedClient,
sourceScope: FromFile | FromReader,
name: "CompressionType",
}
}

// RawDataSize is the uncompressed data size. Should be used to comunicate the file size to the service for efficient ingestion.
// Also used by managed client in the decision to use queued ingestion instead of streaming (if > 4mb)
func RawDataSize(size int64) FileOption {
Expand All @@ -469,8 +485,8 @@ func RawDataSize(size int64) FileOption {
p.Ingestion.RawDataSize = size
return nil
},
clientScopes: QueuedClient | ManagedClient,
sourceScope: FromFile | FromReader | FromBlob,
clientScopes: StreamingClient | ManagedClient | QueuedClient,
name: "RawDataSize",
}
}
44 changes: 7 additions & 37 deletions kusto/ingest/internal/properties/properties.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,44 +14,11 @@ import (

"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go/kusto/data/errors"
"github.com/Azure/azure-kusto-go/kusto/ingest/source"
"github.com/cenkalti/backoff/v4"
"github.com/google/uuid"
)

// CompressionType is a file's compression type.
type CompressionType int8

// String implements fmt.Stringer.
func (c CompressionType) String() string {
switch c {
case GZIP:
return "gzip"
case ZIP:
return "zip"
}
return "unknown compression type"
}

// MarshalJSON implements json.Marshaler.MarshalJSON.
func (c CompressionType) MarshalJSON() ([]byte, error) {
if c == 0 {
return nil, fmt.Errorf("CTUnknown is an invalid compression type")
}
return []byte(fmt.Sprintf("%q", c.String())), nil
}

//goland:noinspection GoUnusedConst - Part of the API
const (
// CTUnknown indicates that that the compression type was unset.
CTUnknown CompressionType = 0
// CTNone indicates that the file was not compressed.
CTNone CompressionType = 1
// GZIP indicates that the file is GZIP compressed.
GZIP CompressionType = 2
// ZIP indicates that the file is ZIP compressed.
ZIP CompressionType = 3
)

// DataFormat indicates what type of encoding format was used for source data.
// Note: This is very similar to ingest.DataFormat, except this supports more formats.
// We are not using a shared list, because this list is used only internally and is for the
Expand Down Expand Up @@ -229,7 +196,7 @@ func DataFormatDiscovery(fName string) DataFormat {
type All struct {
// Ingestion is a set of properties that are used across all ingestion methods.
Ingestion Ingestion
// Source provides options that are used when doing an ingestion on a filesystem.
// Source provides options that are used to operate on the source data.
Source SourceOptions
// Streaming provides options that are used when doing an ingestion from a stream.
Streaming Streaming
Expand All @@ -243,13 +210,13 @@ type ManagedStreaming struct {
Backoff backoff.BackOff
}

// Streaming provides options that are used when doing an ingestion from a stream.
// Streaming provides options that are used when doing a streaming ingestion.
type Streaming struct {
// ClientRequestID is the client request ID to use for the ingestion.
ClientRequestId string
}

// SourceOptions are options that the user provides about the source file that is going to be uploaded.
// SourceOptions are options that the user provides about the source that is going to be uploaded.
type SourceOptions struct {
// ID allows someone to set the UUID for upload themselves. We aren't providing this option at this time, but here
// when we do.
Expand All @@ -263,6 +230,9 @@ type SourceOptions struct {

// OriginalSource is the path to the original source file, used for deletion.
OriginalSource string

// CompressionType is the type of compression used on the file.
CompressionType source.CompressionType
}

// Ingestion is a JSON serializable set of options that must be provided to the service.
Expand Down
103 changes: 78 additions & 25 deletions kusto/ingest/internal/queued/queued.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ import (
"net/url"
"os"
"path/filepath"
"strings"
"time"

"github.com/Azure/azure-kusto-go/kusto/data/errors"
"github.com/Azure/azure-kusto-go/kusto/ingest/internal/gzip"
"github.com/Azure/azure-kusto-go/kusto/ingest/internal/properties"
"github.com/Azure/azure-kusto-go/kusto/ingest/internal/resources"
"github.com/Azure/azure-kusto-go/kusto/ingest/internal/utils"
"github.com/Azure/azure-kusto-go/kusto/ingest/source"

"github.com/Azure/azure-pipeline-go/pipeline"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
Expand Down Expand Up @@ -151,24 +152,9 @@ func (i *Ingestion) Reader(ctx context.Context, reader io.Reader, props properti
return "", errors.ES(errors.OpFileIngest, errors.KBlobstore, "no Kusto queue resources are defined, there is no queue to upload to").SetNoRetry()
}

shouldCompress := true
if props.Source.OriginalSource != "" {
shouldCompress = utils.CompressionDiscovery(props.Source.OriginalSource) == properties.CTNone
}
if props.Source.DontCompress {
shouldCompress = false
}

extension := "gz"
if !shouldCompress {
if props.Source.OriginalSource != "" {
extension = filepath.Ext(props.Source.OriginalSource)
} else {
extension = props.Ingestion.Additional.Format.String() // Best effort
}
}

blobName := fmt.Sprintf("%s_%s_%s_%s.%s", i.db, i.table, nower(), filepath.Base(uuid.New().String()), extension)
compression := CompressionDiscovery(props.Source.OriginalSource)
shouldCompress := ShouldCompress(&props, compression)
blobName := GenBlobName(i.db, i.table, nower(), filepath.Base(uuid.New().String()), filepath.Base(props.Source.OriginalSource), compression, shouldCompress, props.Ingestion.Additional.Format.String())

size := int64(0)

Expand Down Expand Up @@ -331,11 +317,9 @@ var nower = time.Now
// localToBlob copies from a local to an Azure Blobstore blob. It returns the URL of the Blob, the local file info and an
// error if there was one.
func (i *Ingestion) localToBlob(ctx context.Context, from string, client *azblob.Client, container string, props *properties.All) (string, int64, error) {
compression := utils.CompressionDiscovery(from)
blobName := fmt.Sprintf("%s_%s_%s_%s_%s", i.db, i.table, nower(), filepath.Base(uuid.New().String()), filepath.Base(from))
if compression == properties.CTNone {
blobName = blobName + ".gz"
}
compression := CompressionDiscovery(from)
shouldCompress := ShouldCompress(props, compression)
blobName := GenBlobName(i.db, i.table, nower(), filepath.Base(uuid.New().String()), filepath.Base(from), compression, shouldCompress, props.Ingestion.Additional.Format.String())

file, err := os.Open(from)
if err != nil {
Expand All @@ -356,7 +340,7 @@ func (i *Ingestion) localToBlob(ctx context.Context, from string, client *azblob
).SetNoRetry()
}

if compression == properties.CTNone && !props.Source.DontCompress {
if shouldCompress {
gstream := gzip.New()
gstream.Reset(file)

Expand Down Expand Up @@ -396,6 +380,75 @@ func (i *Ingestion) localToBlob(ctx context.Context, from string, client *azblob
return fullUrl(client, container, blobName), stat.Size(), nil
}

// CompressionDiscovery looks at the file extension. If it is one we support, we return that
// CompressionType that represents that value. Otherwise we return CTNone to indicate that the
// file should not be compressed.
func CompressionDiscovery(fName string) source.CompressionType {
ohadbitt marked this conversation as resolved.
Show resolved Hide resolved
if fName == "" {
return source.CTUnknown
}

var ext string
if strings.HasPrefix(strings.ToLower(fName), "http") {
ext = strings.ToLower(filepath.Ext(filepath.Base(fName)))
} else {
ext = strings.ToLower(filepath.Ext(fName))
}

switch ext {
case ".gz":
return source.GZIP
case ".zip":
return source.ZIP
}
return source.CTNone
}

func GenBlobName(databaseName string, tableName string, time time.Time, guid string, fileName string, compressionFileExtension source.CompressionType, shouldCompress bool, dataFormat string) string {
extension := "gz"
if !shouldCompress {
if compressionFileExtension == source.CTNone {
extension = dataFormat
} else {
extension = compressionFileExtension.String()
}

extension = dataFormat
}

blobName := fmt.Sprintf("%s_%s_%s_%s_%s.%s", databaseName, tableName, time, guid, fileName, extension)

return blobName
}

// Do not compress if user specified in DontCompress or CompressionType,
// if the file extension shows compression, or if the format is binary.
func ShouldCompress(props *properties.All, compressionFileExtension source.CompressionType) bool {
if props.Source.DontCompress {
return false
}

if props.Source.CompressionType != source.CTUnknown {
if props.Source.CompressionType != source.CTNone {
return false
}
} else {
if compressionFileExtension != source.CTUnknown && compressionFileExtension != source.CTNone {
return false
}
}

switch props.Ingestion.Additional.Format {
case properties.AVRO:
case properties.ApacheAVRO:
case properties.Parquet:
case properties.ORC:
return false
}

return true
}

// This allows mocking the stat func later on
var statFunc = os.Stat

Expand Down
14 changes: 8 additions & 6 deletions kusto/ingest/internal/queued/queued_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/Azure/azure-kusto-go/kusto/data/errors"
"github.com/Azure/azure-kusto-go/kusto/ingest/internal/properties"
"github.com/Azure/azure-kusto-go/kusto/ingest/internal/utils"
"github.com/Azure/azure-kusto-go/kusto/ingest/source"

"github.com/stretchr/testify/assert"

"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
Expand Down Expand Up @@ -56,13 +58,13 @@ func TestCompressionDiscovery(t *testing.T) {

tests := []struct {
input string
want properties.CompressionType
want source.CompressionType
AsafMah marked this conversation as resolved.
Show resolved Hide resolved
}{
{"https://somehost.somedomain.com:8080/v1/somestuff/file.gz", properties.GZIP},
{"https://somehost.somedomain.com:8080/v1/somestuff/file.zip", properties.ZIP},
{"/path/to/a/file.gz", properties.GZIP},
{"/path/to/a/file.zip", properties.ZIP},
{"/path/to/a/file", properties.CTNone},
{"https://somehost.somedomain.com:8080/v1/somestuff/file.gz", source.GZIP},
{"https://somehost.somedomain.com:8080/v1/somestuff/file.zip", source.ZIP},
{"/path/to/a/file.gz", source.GZIP},
{"/path/to/a/file.zip", source.ZIP},
{"/path/to/a/file", source.CTNone},
}

for _, test := range tests {
Expand Down
16 changes: 8 additions & 8 deletions kusto/ingest/internal/utils/ingestion_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"

"github.com/Azure/azure-kusto-go/kusto/ingest/internal/properties"
"github.com/Azure/azure-kusto-go/kusto/ingest/internal/resources"
"github.com/Azure/azure-kusto-go/kusto/ingest/source"
)

const EstimatedCompressionFactor = 11
Expand Down Expand Up @@ -79,10 +79,10 @@ func FetchBlobSize(fPath string, ctx context.Context, client *http.Client) (size
return *properties.ContentLength, nil
}

func EstimateRawDataSize(compression properties.CompressionType, fileSize int64) int64 {
func EstimateRawDataSize(compression source.CompressionType, fileSize int64) int64 {
switch compression {
case properties.GZIP:
case properties.ZIP:
case source.GZIP:
case source.ZIP:
return fileSize * EstimatedCompressionFactor
}

Expand All @@ -92,7 +92,7 @@ func EstimateRawDataSize(compression properties.CompressionType, fileSize int64)
// CompressionDiscovery looks at the file extension. If it is one we support, we return that
// CompressionType that represents that value. Otherwise we return CTNone to indicate that the
// file should not be compressed.
func CompressionDiscovery(fName string) properties.CompressionType {
func CompressionDiscovery(fName string) source.CompressionType {
var ext string
if strings.HasPrefix(strings.ToLower(fName), "http") {
ext = strings.ToLower(filepath.Ext(path.Base(fName)))
Expand All @@ -102,9 +102,9 @@ func CompressionDiscovery(fName string) properties.CompressionType {

switch ext {
case ".gz":
return properties.GZIP
return source.GZIP
case ".zip":
return properties.ZIP
return source.ZIP
}
return properties.CTNone
return source.CTNone
}
14 changes: 8 additions & 6 deletions kusto/ingest/managed.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
"github.com/Azure/azure-kusto-go/kusto/data/errors"
"github.com/Azure/azure-kusto-go/kusto/ingest/internal/gzip"
"github.com/Azure/azure-kusto-go/kusto/ingest/internal/properties"
"github.com/Azure/azure-kusto-go/kusto/ingest/internal/queued"
"github.com/Azure/azure-kusto-go/kusto/ingest/internal/utils"
"github.com/Azure/azure-kusto-go/kusto/ingest/source"

"github.com/cenkalti/backoff/v4"
"github.com/google/uuid"
Expand Down Expand Up @@ -100,7 +102,7 @@ func (m *Managed) FromFile(ctx context.Context, fPath string, options ...FileOpt

if !local {
var size int64
var compressionTypeForEstimation properties.CompressionType
var compressionTypeForEstimation source.CompressionType
if size = props.Ingestion.RawDataSize; size == 0 {
size, err = utils.FetchBlobSize(fPath, ctx, m.queued.client.HttpClient())
if err != nil {
Expand All @@ -111,7 +113,7 @@ func (m *Managed) FromFile(ctx context.Context, fPath string, options ...FileOpt
props.Ingestion.RawDataSize = utils.EstimateRawDataSize(compressionTypeForEstimation, size)
} else {
// If user sets raw data size we always want to devide it for estimation
compressionTypeForEstimation = properties.CTNone
compressionTypeForEstimation = source.CTNone
}

// File is not compressed and user says its compressed, raw 10 mb -> do
Expand All @@ -129,9 +131,9 @@ func (m *Managed) FromFile(ctx context.Context, fPath string, options ...FileOpt
return m.managedStreamImpl(ctx, file, props)
}

func shouldUseQueuedIngestBySize(compression properties.CompressionType, fileSize int64) bool {
func shouldUseQueuedIngestBySize(compression source.CompressionType, fileSize int64) bool {
switch compression {
case properties.GZIP, properties.ZIP:
case source.GZIP, source.ZIP:
return fileSize > maxStreamingSize
}

Expand All @@ -153,7 +155,7 @@ func (m *Managed) FromReader(ctx context.Context, reader io.Reader, options ...F

func (m *Managed) managedStreamImpl(ctx context.Context, payload io.ReadCloser, props properties.All) (*Result, error) {
defer payload.Close()
compress := !props.Source.DontCompress
compress := queued.ShouldCompress(&props, source.CTUnknown)
var compressed io.Reader = payload
if compress {
compressed = gzip.Compress(io.NopCloser(payload))
Expand All @@ -167,7 +169,7 @@ func (m *Managed) managedStreamImpl(ctx context.Context, payload io.ReadCloser,
return nil, err
}

if shouldUseQueuedIngestBySize(properties.GZIP, int64(len(buf))) {
if shouldUseQueuedIngestBySize(source.GZIP, int64(len(buf))) {
combinedBuf := io.MultiReader(bytes.NewReader(buf), compressed)
return m.queued.fromReader(ctx, combinedBuf, []FileOption{}, props)
}
Expand Down
Loading
Loading