Skip to content
This repository has been archived by the owner on May 7, 2023. It is now read-only.

Commit

Permalink
Speed up by using single session
Browse files Browse the repository at this point in the history
Transactions:		       31807 hits
Availability:		       97.32 %
Elapsed time:		       59.28 secs
Data transferred:	        0.25 MB
Response time:		        0.04 secs
Transaction rate:	      536.56 trans/sec
Throughput:		        0.00 MB/sec
Concurrency:		       20.38
Successful transactions:       31807
Failed transactions:	         876
Longest transaction:	       20.16
Shortest transaction:	        0.00
  • Loading branch information
scoiatael committed Feb 28, 2017
1 parent c5062ac commit aa81669
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 22 deletions.
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,10 @@ run: archai

archai: $(shell find . -type f -regex .*go$)
go build .

siege:
bash scripts/siege.sh

test:
go test
cd persistence && go test
3 changes: 1 addition & 2 deletions actions/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ type Migrate struct{}

// Run all migrations
func (a Migrate) Run(c Context) error {
persistenceProvider := c.Persistence()
migrationSession, err := persistenceProvider.MigrationSession()
migrationSession, err := c.Persistence().MigrationSession()
if err != nil {
return errors.Wrap(err, "Obtaining session failed")
}
Expand Down
4 changes: 1 addition & 3 deletions actions/read_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@ type ReadEvents struct {
const minTimeuuid = "00000000-0000-1000-8080-808080808080"

func (re *ReadEvents) Run(c Context) error {
persistenceProvider := c.Persistence()
session, err := persistenceProvider.Session()
session, err := c.Persistence().Session()
if err != nil {
return errors.Wrap(err, "Obtaining session failed")
}
defer session.Close()
if len(re.Cursor) == 0 {
re.Cursor = minTimeuuid
}
Expand Down
4 changes: 1 addition & 3 deletions actions/write_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@ type WriteEvent struct {
}

func (we WriteEvent) Run(c Context) error {
persistenceProvider := c.Persistence()
session, err := persistenceProvider.Session()
session, err := c.Persistence().Session()
if err != nil {
return errors.Wrap(err, "Obtaining session failed")
}
defer session.Close()
we.Meta["version"] = c.Version()
return errors.Wrap(session.WriteEvent(we.Stream, we.Payload, we.Meta), fmt.Sprintf("Error writing event to stream %s", we.Stream))
}
4 changes: 4 additions & 0 deletions actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ var _ = BeforeSuite(func() {
config = Config{}
config.Hosts = []string{"127.0.0.1"}
config.Keyspace = testingKeyspace
err := config.Init()
if err != nil {
panic(err)
}
})

var _ = AfterSuite(func() {
Expand Down
31 changes: 25 additions & 6 deletions config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"fmt"
"log"

"github.com/scoiatael/archai/actions"
Expand All @@ -10,9 +11,11 @@ import (

// Config is a context for all application actions.
type Config struct {
Keyspace string
Hosts []string
Actions []actions.Action
Keyspace string
Hosts []string
Actions []actions.Action
provider persistence.Provider
initialized bool
}

func (c Config) HandleErr(err error) {
Expand All @@ -21,13 +24,15 @@ func (c Config) HandleErr(err error) {

func (c Config) Migrations() map[string]persistence.Migration {
m := make(map[string]persistence.Migration)
m["create_events_table"] = persistence.CreateEventsTable
m["001_create_events_table"] = persistence.CreateEventsTable
return m
}

func (c Config) Persistence() persistence.Provider {
provider := persistence.CassandraProvider{Hosts: c.Hosts, Keyspace: c.Keyspace}
return &provider
if !c.initialized {
panic(fmt.Errorf("Persistence not initialized!"))
}
return c.provider
}

// Version returns current version
Expand All @@ -39,7 +44,21 @@ func (c Config) HttpHandler() actions.HttpHandler {
return http.NewIris(c)
}

func (c *Config) Init() error {
new_provider := persistence.CassandraProvider{Hosts: c.Hosts, Keyspace: c.Keyspace}
err := new_provider.Init()
if err != nil {
return err
}
c.provider = &new_provider
c.initialized = true
return nil
}

func (c Config) Run() error {
if err := c.Init(); err != nil {
return err
}
for _, a := range c.Actions {
err := a.Run(c)
if err != nil {
Expand Down
10 changes: 3 additions & 7 deletions persistence/persistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ func randomString() string {
var _ = BeforeSuite(func() {
var err error
provider = CassandraProvider{Hosts: []string{"127.0.0.1"}, Keyspace: testingKeyspace}
if err := provider.Init(); err != nil {
panic(err)
}
cluster := provider.NewCluster()
cluster.Consistency = gocql.All
root_sess, err = cluster.CreateSession()
Expand Down Expand Up @@ -100,10 +103,6 @@ var _ = Describe("Persistence", func() {
Expect(err).NotTo(HaveOccurred())
})

// AfterEach(func() {
// sess.Close()
// })

Context("when there's no migration table", func() {
It("creates migration table", func() {
var (
Expand Down Expand Up @@ -162,9 +161,6 @@ var _ = Describe("Persistence", func() {
sess, err = provider.Session()
Expect(err).NotTo(HaveOccurred())
})
AfterEach(func() {
sess.Close()
})
Context("when there are no events", func() {
It("returns empty array", func() {
es, err := sess.ReadEvents(randomString(), "00000000-0000-1000-8080-808080808080", 10)
Expand Down
18 changes: 17 additions & 1 deletion persistence/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,28 @@ type Provider interface {
type CassandraProvider struct {
Hosts []string
Keyspace string
session *Session
}

func (cp *CassandraProvider) NewCluster() *gocql.ClusterConfig {
return gocql.NewCluster(cp.Hosts...)
}

func (cp *CassandraProvider) Session() (Session, error) {
func (cp *CassandraProvider) newSession() (Session, error) {
cluster := cp.NewCluster()
cluster.Keyspace = cp.Keyspace
cluster.Consistency = gocql.Quorum
sess, err := cluster.CreateSession()
return &CassandraSession{sess}, errors.Wrap(err, "CreateSession failed")
}

func (cp *CassandraProvider) Session() (Session, error) {
if cp.session != nil {
return *cp.session, nil
}
return nil, fmt.Errorf("Initialize CassandraProvider with NewProvider first")
}

const createKeySpace = `CREATE KEYSPACE IF NOT EXISTS %s WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };`

func (cp *CassandraProvider) MigrationSession() (MigrationSession, error) {
Expand All @@ -50,3 +58,11 @@ func (cp *CassandraProvider) MigrationSession() (MigrationSession, error) {

return &CassandraMigrationSession{sess}, errors.Wrap(err, "CreateSession failed")
}

func (c *CassandraProvider) Init() error {
new_sess, err := c.newSession()
if err == nil {
c.session = &new_sess
}
return err
}

0 comments on commit aa81669

Please sign in to comment.