From aa816693158e5c770b6e12fefc7a0de33642dcfe Mon Sep 17 00:00:00 2001 From: scoiatael Date: Tue, 28 Feb 2017 17:06:15 +0100 Subject: [PATCH] Speed up by using single session Transactions: 31807 hits Availability: 97.32 % Elapsed time: 59.28 secs Data transferred: 0.25 MB Response time: 0.04 secs Transaction rate: 536.56 trans/sec Throughput: 0.00 MB/sec Concurrency: 20.38 Successful transactions: 31807 Failed transactions: 876 Longest transaction: 20.16 Shortest transaction: 0.00 --- Makefile | 7 +++++++ actions/migrate.go | 3 +-- actions/read_events.go | 4 +--- actions/write_event.go | 4 +--- actions_test.go | 4 ++++ config.go | 31 +++++++++++++++++++++++++------ persistence/persistence_test.go | 10 +++------- persistence/provider.go | 18 +++++++++++++++++- 8 files changed, 59 insertions(+), 22 deletions(-) diff --git a/Makefile b/Makefile index 6f739f8..7d368ea 100644 --- a/Makefile +++ b/Makefile @@ -4,3 +4,10 @@ run: archai archai: $(shell find . -type f -regex .*go$) go build . + +siege: + bash scripts/siege.sh + +test: + go test + cd persistence && go test diff --git a/actions/migrate.go b/actions/migrate.go index 60b74f2..5d724e4 100644 --- a/actions/migrate.go +++ b/actions/migrate.go @@ -12,8 +12,7 @@ type Migrate struct{} // Run all migrations func (a Migrate) Run(c Context) error { - persistenceProvider := c.Persistence() - migrationSession, err := persistenceProvider.MigrationSession() + migrationSession, err := c.Persistence().MigrationSession() if err != nil { return errors.Wrap(err, "Obtaining session failed") } diff --git a/actions/read_events.go b/actions/read_events.go index c16d2cf..2166943 100644 --- a/actions/read_events.go +++ b/actions/read_events.go @@ -18,12 +18,10 @@ type ReadEvents struct { const minTimeuuid = "00000000-0000-1000-8080-808080808080" func (re *ReadEvents) Run(c Context) error { - persistenceProvider := c.Persistence() - session, err := persistenceProvider.Session() + session, err := c.Persistence().Session() if err != nil { return errors.Wrap(err, "Obtaining session failed") } - defer session.Close() if len(re.Cursor) == 0 { re.Cursor = minTimeuuid } diff --git a/actions/write_event.go b/actions/write_event.go index 26a20a3..cebeb5f 100644 --- a/actions/write_event.go +++ b/actions/write_event.go @@ -13,12 +13,10 @@ type WriteEvent struct { } func (we WriteEvent) Run(c Context) error { - persistenceProvider := c.Persistence() - session, err := persistenceProvider.Session() + session, err := c.Persistence().Session() if err != nil { return errors.Wrap(err, "Obtaining session failed") } - defer session.Close() we.Meta["version"] = c.Version() return errors.Wrap(session.WriteEvent(we.Stream, we.Payload, we.Meta), fmt.Sprintf("Error writing event to stream %s", we.Stream)) } diff --git a/actions_test.go b/actions_test.go index aa938c2..13f23f6 100644 --- a/actions_test.go +++ b/actions_test.go @@ -26,6 +26,10 @@ var _ = BeforeSuite(func() { config = Config{} config.Hosts = []string{"127.0.0.1"} config.Keyspace = testingKeyspace + err := config.Init() + if err != nil { + panic(err) + } }) var _ = AfterSuite(func() { diff --git a/config.go b/config.go index 58ef779..011ee9b 100644 --- a/config.go +++ b/config.go @@ -1,6 +1,7 @@ package main import ( + "fmt" "log" "github.com/scoiatael/archai/actions" @@ -10,9 +11,11 @@ import ( // Config is a context for all application actions. type Config struct { - Keyspace string - Hosts []string - Actions []actions.Action + Keyspace string + Hosts []string + Actions []actions.Action + provider persistence.Provider + initialized bool } func (c Config) HandleErr(err error) { @@ -21,13 +24,15 @@ func (c Config) HandleErr(err error) { func (c Config) Migrations() map[string]persistence.Migration { m := make(map[string]persistence.Migration) - m["create_events_table"] = persistence.CreateEventsTable + m["001_create_events_table"] = persistence.CreateEventsTable return m } func (c Config) Persistence() persistence.Provider { - provider := persistence.CassandraProvider{Hosts: c.Hosts, Keyspace: c.Keyspace} - return &provider + if !c.initialized { + panic(fmt.Errorf("Persistence not initialized!")) + } + return c.provider } // Version returns current version @@ -39,7 +44,21 @@ func (c Config) HttpHandler() actions.HttpHandler { return http.NewIris(c) } +func (c *Config) Init() error { + new_provider := persistence.CassandraProvider{Hosts: c.Hosts, Keyspace: c.Keyspace} + err := new_provider.Init() + if err != nil { + return err + } + c.provider = &new_provider + c.initialized = true + return nil +} + func (c Config) Run() error { + if err := c.Init(); err != nil { + return err + } for _, a := range c.Actions { err := a.Run(c) if err != nil { diff --git a/persistence/persistence_test.go b/persistence/persistence_test.go index c3bd53f..09ede4b 100644 --- a/persistence/persistence_test.go +++ b/persistence/persistence_test.go @@ -54,6 +54,9 @@ func randomString() string { var _ = BeforeSuite(func() { var err error provider = CassandraProvider{Hosts: []string{"127.0.0.1"}, Keyspace: testingKeyspace} + if err := provider.Init(); err != nil { + panic(err) + } cluster := provider.NewCluster() cluster.Consistency = gocql.All root_sess, err = cluster.CreateSession() @@ -100,10 +103,6 @@ var _ = Describe("Persistence", func() { Expect(err).NotTo(HaveOccurred()) }) - // AfterEach(func() { - // sess.Close() - // }) - Context("when there's no migration table", func() { It("creates migration table", func() { var ( @@ -162,9 +161,6 @@ var _ = Describe("Persistence", func() { sess, err = provider.Session() Expect(err).NotTo(HaveOccurred()) }) - AfterEach(func() { - sess.Close() - }) Context("when there are no events", func() { It("returns empty array", func() { es, err := sess.ReadEvents(randomString(), "00000000-0000-1000-8080-808080808080", 10) diff --git a/persistence/provider.go b/persistence/provider.go index e12d494..4dc32c2 100644 --- a/persistence/provider.go +++ b/persistence/provider.go @@ -16,13 +16,14 @@ type Provider interface { type CassandraProvider struct { Hosts []string Keyspace string + session *Session } func (cp *CassandraProvider) NewCluster() *gocql.ClusterConfig { return gocql.NewCluster(cp.Hosts...) } -func (cp *CassandraProvider) Session() (Session, error) { +func (cp *CassandraProvider) newSession() (Session, error) { cluster := cp.NewCluster() cluster.Keyspace = cp.Keyspace cluster.Consistency = gocql.Quorum @@ -30,6 +31,13 @@ func (cp *CassandraProvider) Session() (Session, error) { return &CassandraSession{sess}, errors.Wrap(err, "CreateSession failed") } +func (cp *CassandraProvider) Session() (Session, error) { + if cp.session != nil { + return *cp.session, nil + } + return nil, fmt.Errorf("Initialize CassandraProvider with NewProvider first") +} + const createKeySpace = `CREATE KEYSPACE IF NOT EXISTS %s WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };` func (cp *CassandraProvider) MigrationSession() (MigrationSession, error) { @@ -50,3 +58,11 @@ func (cp *CassandraProvider) MigrationSession() (MigrationSession, error) { return &CassandraMigrationSession{sess}, errors.Wrap(err, "CreateSession failed") } + +func (c *CassandraProvider) Init() error { + new_sess, err := c.newSession() + if err == nil { + c.session = &new_sess + } + return err +}