Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create simple way to leverage bulk #43

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ This is a `golang` library for interfacing with `Salesforce` APIs.
### Installing
To start using GO-SFDC, install GO and run `go get`
```
go get -u github.com/g8rswimmer/go-sfdc
go get -u github.com/aheber/go-sfdc
```
This will retrieve the library.

Expand Down
2 changes: 1 addition & 1 deletion bulk/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package bulk
import (
"errors"

"github.com/g8rswimmer/go-sfdc/session"
"github.com/aheber/go-sfdc/session"
)

const bulk2Endpoint = "/jobs/ingest"
Expand Down
4 changes: 2 additions & 2 deletions bulk/bulk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"strings"
"testing"

"github.com/g8rswimmer/go-sfdc/session"
"github.com/aheber/go-sfdc/session"
)

func TestNewResource(t *testing.T) {
Expand Down Expand Up @@ -165,7 +165,7 @@ func TestResource_AllJobs(t *testing.T) {
"operation": "Insert",
"state": "Open",
"systemModstamp": "1/1/1980"
}
}
]
}`
return &http.Response{
Expand Down
37 changes: 20 additions & 17 deletions bulk/formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ func NewFormatter(job *Job, fields []string) (*Formatter, error) {
}
if _, err := f.sb.WriteString(job.newline()); err != nil {
return nil, err

}

return f, nil
Expand All @@ -52,22 +51,7 @@ func (f *Formatter) Add(records ...Record) error {
}

for _, record := range records {
recFields := record.Fields()
values := make([]string, len(f.fields))
insertNull := record.InsertNull()
for idx, field := range f.fields {
if insertNull {
values[idx] = "#N/A"
} else {
values[idx] = ""
}
if value, ok := recFields[field]; ok {
if value != nil {
values[idx] = fmt.Sprintf("%v", value)
}
}
}
_, err := f.sb.WriteString(strings.Join(values, f.job.delimiter()))
_, err := f.sb.WriteString(f.buildRecordString(record))
if err != nil {
return err
}
Expand All @@ -81,6 +65,25 @@ func (f *Formatter) Add(records ...Record) error {
return nil
}

func (f *Formatter) buildRecordString(record Record) string {
recFields := record.Fields()
values := make([]string, len(f.fields))
insertNull := record.InsertNull()
for idx, field := range f.fields {
if insertNull {
values[idx] = "#N/A"
} else {
values[idx] = ""
}
if value, ok := recFields[field]; ok {
if value != nil {
values[idx] = fmt.Sprintf("%v", value)
}
}
}
return strings.Join(values, f.job.delimiter())
}

// Reader will return a reader of the bulk uploader field record body.
func (f *Formatter) Reader() *strings.Reader {
return strings.NewReader(f.sb.String())
Expand Down
4 changes: 2 additions & 2 deletions bulk/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
"strconv"
"strings"

sfdc "github.com/g8rswimmer/go-sfdc"
"github.com/g8rswimmer/go-sfdc/session"
sfdc "github.com/aheber/go-sfdc"
"github.com/aheber/go-sfdc/session"
)

// JobType is the bulk job type.
Expand Down
117 changes: 117 additions & 0 deletions bulk/job_simple.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package bulk

import (
"io/ioutil"

b64 "encoding/base64"

"github.com/aheber/go-sfdc/session"
)

const (
// A request can provide CSV data that does not in total exceed 150 MB of base64 encoded content.
// https://developer.salesforce.com/docs/atlas.en-us.api_bulk_v2.meta/api_bulk_v2/upload_job_data.htm
maxUploadSizeBytes = 150000000 // I think Salesforce is using this number and not the actual 150MB byte value
)

// ProcessDataAsBulkJobs recieves all the necessary information to upload large amounts of data
// breaking it into individual jobs as needed if they cross the size threshold, returns the created jobs
func ProcessDataAsBulkJobs(session session.ServiceFormatter, options Options, fields []string, data []Record) ([]*Job, error) {
// Get an initial job
jobs := []*Job{}
j, err := openBulkJob(session, options)
if err != nil {
return jobs, err
}
jobs = append(jobs, j)

f, err := NewFormatter(j, fields)
if err != nil {
return jobs, err
}
// Get the by counts of the headers and the line encoding
headers, err := ioutil.ReadAll(f.Reader())
if err != nil {
return jobs, err
}
sizeOfHeaders := len(b64.StdEncoding.EncodeToString(headers))
sizeOfLineEnding := len(b64.StdEncoding.EncodeToString([]byte(options.LineEnding)))

// loop through output from records, analyze the impact of adding another record to the total CSV size
rollingSize := sizeOfHeaders + sizeOfLineEnding
rollingRowCount := 0
for _, v := range data {

// get size of encoded data for this row
sizeOfRow := len(b64.StdEncoding.EncodeToString([]byte(f.buildRecordString(v))))
rollingSize += sizeOfRow + sizeOfLineEnding
// if the new record would push it over the threshold send the CSV to the open job and start a new one
if rollingSize > maxUploadSizeBytes {
// Send the data, close the job, start a new job, make a new formatter, do it again
err := sendDataToJob(j, f)
if err != nil {
return jobs, err
}

// Get a new job to load against
j, err = openBulkJob(session, options)
if err != nil {
return jobs, err
}
jobs = append(jobs, j)

// Build a new formatter and reset counters
f, err = NewFormatter(j, fields)
if err != nil {
return jobs, err
}
rollingSize = sizeOfHeaders + sizeOfLineEnding + sizeOfRow + sizeOfLineEnding
rollingRowCount = 0
}
f.Add(v)
rollingRowCount++
}

// send the last batch
if rollingRowCount > 0 {
err = sendDataToJob(j, f)
if err != nil {
return jobs, err
}
}
return jobs, nil
}

func sendDataToJob(j *Job, f *Formatter) error {
err := j.Upload(f.Reader())
if err != nil {
return err
}

err = closeBulkJob(j)
if err != nil {
return err
}
return nil
}

func openBulkJob(session session.ServiceFormatter, jobOpts Options) (*Job, error) {
resource, err := NewResource(session)
if err != nil {
return &Job{}, err
}

job, err := resource.CreateJob(jobOpts)
if err != nil {
return &Job{}, err
}
return job, nil
}

func closeBulkJob(job *Job) error {
_, err := job.Close()
if err != nil {
return err
}
return nil
}
1 change: 1 addition & 0 deletions bulk/job_simple_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package bulk
8 changes: 4 additions & 4 deletions bulk/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"strings"
"testing"

"github.com/g8rswimmer/go-sfdc/session"
"github.com/aheber/go-sfdc/session"
)

func TestJob_formatOptions(t *testing.T) {
Expand Down Expand Up @@ -420,7 +420,7 @@ func TestJob_response(t *testing.T) {
"fields" : [ "Id" ],
"message" : "Account ID: id value of incorrect type: 001900K0001pPuOAAU",
"errorCode" : "MALFORMED_ID"
}
}
]`
return &http.Response{
StatusCode: http.StatusBadRequest,
Expand Down Expand Up @@ -764,7 +764,7 @@ func TestJob_setState(t *testing.T) {
"fields" : [ "Id" ],
"message" : "Account ID: id value of incorrect type: 001900K0001pPuOAAU",
"errorCode" : "MALFORMED_ID"
}
}
]`
return &http.Response{
StatusCode: http.StatusBadRequest,
Expand Down Expand Up @@ -895,7 +895,7 @@ func TestJob_infoResponse(t *testing.T) {
"fields" : [ "Id" ],
"message" : "Account ID: id value of incorrect type: 001900K0001pPuOAAU",
"errorCode" : "MALFORMED_ID"
}
}
]`
return &http.Response{
StatusCode: http.StatusBadRequest,
Expand Down
4 changes: 2 additions & 2 deletions bulk/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"net/http"
"strconv"

sfdc "github.com/g8rswimmer/go-sfdc"
"github.com/g8rswimmer/go-sfdc/session"
sfdc "github.com/aheber/go-sfdc"
"github.com/aheber/go-sfdc/session"
)

// Parameters to query all of the bulk jobs.
Expand Down
10 changes: 5 additions & 5 deletions bulk/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"strings"
"testing"

"github.com/g8rswimmer/go-sfdc/session"
"github.com/aheber/go-sfdc/session"
)

func TestJobs_do(t *testing.T) {
Expand Down Expand Up @@ -49,7 +49,7 @@ func TestJobs_do(t *testing.T) {
"operation": "Insert",
"state": "Open",
"systemModstamp": "1/1/1980"
}
}
]
}`
return &http.Response{
Expand Down Expand Up @@ -99,7 +99,7 @@ func TestJobs_do(t *testing.T) {
"fields" : [ "Id" ],
"message" : "Account ID: id value of incorrect type: 001900K0001pPuOAAU",
"errorCode" : "MALFORMED_ID"
}
}
]`
return &http.Response{
StatusCode: http.StatusBadRequest,
Expand Down Expand Up @@ -168,7 +168,7 @@ func Test_newJobs(t *testing.T) {
"operation": "Insert",
"state": "Open",
"systemModstamp": "1/1/1980"
}
}
]
}`
return &http.Response{
Expand Down Expand Up @@ -374,7 +374,7 @@ func TestJobs_Next(t *testing.T) {
"operation": "Insert",
"state": "Open",
"systemModstamp": "1/1/1980"
}
}
]
}`
return &http.Response{
Expand Down
4 changes: 2 additions & 2 deletions composite/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"fmt"
"net/http"

"github.com/g8rswimmer/go-sfdc"
"github.com/g8rswimmer/go-sfdc/session"
"github.com/aheber/go-sfdc"
"github.com/aheber/go-sfdc/session"
)

// Subrequester provides the composite batch API requests.
Expand Down
4 changes: 2 additions & 2 deletions composite/batch/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"strings"
"testing"

"github.com/g8rswimmer/go-sfdc/session"
"github.com/aheber/go-sfdc/session"
)

type mockSubrequester struct {
Expand Down Expand Up @@ -300,7 +300,7 @@ func TestResource_Retrieve(t *testing.T) {
"fields" : [ "Id" ],
"message" : "Account ID: id value of incorrect type: 001900K0001pPuOAAU",
"errorCode" : "MALFORMED_ID"
}
}
]`
return &http.Response{
StatusCode: http.StatusBadRequest,
Expand Down
4 changes: 2 additions & 2 deletions composite/composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"fmt"
"net/http"

"github.com/g8rswimmer/go-sfdc"
"github.com/g8rswimmer/go-sfdc/session"
"github.com/aheber/go-sfdc"
"github.com/aheber/go-sfdc/session"
)

// Subrequester provides the composite API requests. The
Expand Down
4 changes: 2 additions & 2 deletions composite/composite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"strings"
"testing"

"github.com/g8rswimmer/go-sfdc/session"
"github.com/aheber/go-sfdc/session"
)

type mockSubrequester struct {
Expand Down Expand Up @@ -449,7 +449,7 @@ func TestResource_Retrieve(t *testing.T) {
"fields" : [ "Id" ],
"message" : "Account ID: id value of incorrect type: 001900K0001pPuOAAU",
"errorCode" : "MALFORMED_ID"
}
}
]`
return &http.Response{
StatusCode: http.StatusBadRequest,
Expand Down
2 changes: 1 addition & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package sfdc
import (
"net/http"

"github.com/g8rswimmer/go-sfdc/credentials"
"github.com/aheber/go-sfdc/credentials"
)

// Configuration is the structure for goforce sessions.
Expand Down
Loading