Skip to content

Commit

Permalink
refactor(dao) base_dao API change for execute()
Browse files Browse the repository at this point in the history
Replace confusing `_execute()` by `execute()` for public usage, and
rename `_execute()` to `build_args_and_execute()`, for internal usage
only.
  • Loading branch information
thibaultcha committed Dec 17, 2015
1 parent bbfd502 commit a2ac0a6
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 152 deletions.
2 changes: 1 addition & 1 deletion kong/dao/cassandra/apis.lua
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ function Apis:find_all()
local apis = {}
local select_q = query_builder.select(self._table)

for rows, err in self:execute(select_q, nil, nil, {auto_paging = true}) do
for rows, err in self:execute(select_q, nil, {auto_paging = true}) do
if err then
return nil, err
elseif rows ~= nil then
Expand Down
247 changes: 123 additions & 124 deletions kong/dao/cassandra/base_dao.lua
Original file line number Diff line number Diff line change
Expand Up @@ -24,69 +24,80 @@ local error_types = constants.DATABASE_ERROR_TYPES

local BaseDao = Object:extend()

---
-- @local
-- Build the array of arguments to pass to lua-cassandra's `execute()` method.
-- Note:
-- Since this method only accepts an ordered list, we build this list from
-- the entity `t` and an (ordered) array of parameters for a query, taking
-- into account special cassandra values (uuid, timestamps, NULL).
-- @param[type=table] schema A schema with type properties to encode specific values.
-- @param[type=table] t Values to bind to a statement.
-- @param[type=table] parameters An ordered list of parameters.
-- @treturn table An ordered list of values to pass to lua-cassandra `execute()` args.
-- @treturn table Error Cassandra type validation errors
local function encode_cassandra_args(schema, t, args_keys)
local args_to_bind = {}
local errors

for _, column in ipairs(args_keys) do
local schema_field = schema.fields[column]
local arg = t[column]
--- Public interface.
-- Public methods developers can use in Kong core or in any plugin.
-- @section public

if schema_field.type == "id" and arg then
if validations.is_valid_uuid(arg) then
arg = cassandra.uuid(arg)
else
errors = utils.add_error(errors, column, arg.." is an invalid uuid")
local function page_iterator(self, session, query, args, query_options)
local iter = session:execute(query, args, query_options)
return function(query, previous_rows)
local rows, err, page = iter(query, previous_rows)
if rows == nil or err ~= nil then
session:set_keep_alive()
else
for i, row in ipairs(rows) do
rows[i] = self:_unmarshall(row)
end
elseif schema_field.type == "timestamp" and arg then
arg = cassandra.timestamp(arg)
elseif arg == nil then
arg = cassandra.unset
end
return rows, err, page
end, query
end

table.insert(args_to_bind, arg)
--- Execute a query.
-- This method should be called with the proper **args** formatting (as an array).
-- See `execute()` for building this parameter.
-- @see execute
-- @param query Plain string CQL query.
-- @param[type=table] args (Optional) Arguments to the query, as an array. Simply passed to lua-cassandra `execute()`.
-- @param[type=table] query_options (Optional) Options to give to lua-cassandra `execute()` query_options.
-- @param[type=string] keyspace (Optional) Override the keyspace for this query if specified.
-- @treturn table If the result consists of ROWS, a table with an array of unmarshalled rows and a `next_page` property if the results has a `paging_state`. If the result is of type "VOID", a boolean representing the success of the query. Otherwise, the raw result as given by lua-cassandra.
-- @treturn table An error if any during the execution.
function BaseDao:execute(query, args, query_options, keyspace)
local options = self._factory:get_session_options()
if keyspace then
options.keyspace = keyspace
end

return args_to_bind, errors
end
local session, err = cassandra.spawn_session(options)
if not session then
return nil, DaoError(err, constants.DATABASE_ERROR_TYPES.DATABASE)
end

--- Public interface.
-- Public methods developers can use in Kong core or in any plugin.
-- @section public
if query_options and query_options.auto_paging then
return page_iterator(self, session, query, args, query_options)
end

---
-- Bind a table of arguments to a query depending on the entity's schema,
-- and then execute the query via **:_execute()**.
-- @param[type=string] query The query to execute.
-- @param[type=table] columns A list of column names where each value indicates the column of the value at the same index in `args_to_bind`.
-- @param[type=table] args_to_bind Key/value table of arguments to bind.
-- @param[type=table] query_options Options to pass to lua-cassandra `execute()` query_options.
-- @return return values of `_execute()`.
-- @see _execute
function BaseDao:execute(query, columns, args_to_bind, query_options)
-- Build args array if operation has some
local args
if columns and args_to_bind then
local errors
args, errors = encode_cassandra_args(self._schema, args_to_bind, columns)
if errors then
return nil, DaoError(errors, error_types.INVALID_TYPE)
end
local results, err = session:execute(query, args, query_options)
if err then
err = DaoError(err, constants.DATABASE_ERROR_TYPES.DATABASE)
end

return self:_execute(query, args, query_options)
-- First, close the session (and underlying sockets)
session:set_keep_alive()

-- Parse result
if results and results.type == "ROWS" then
-- do we have more pages to fetch? if so, alias the paging_state
if results.meta.has_more_pages then
results.next_page = results.meta.paging_state
end

-- only the DAO needs those, it should be transparant in the application
results.meta = nil
results.type = nil

for i, row in ipairs(results) do
results[i] = self:_unmarshall(row)
end

return results, err
elseif results and results.type == "VOID" then
-- result is not a set of rows, let's return a boolean to indicate success
return err == nil, err
else
return results, err
end
end

--- Children DAOs interface.
Expand Down Expand Up @@ -140,7 +151,7 @@ function BaseDao:insert(t)
end

local insert_q, columns = query_builder.insert(self._table, t)
local _, stmt_err = self:execute(insert_q, columns, self:_marshall(t))
local _, stmt_err = self:build_args_and_execute(insert_q, columns, self:_marshall(t))
if stmt_err then
return nil, stmt_err
else
Expand Down Expand Up @@ -250,7 +261,7 @@ function BaseDao:update(t, full)

local update_q, columns = query_builder.update(self._table, t_no_primary_key, t_primary_key)

local _, stmt_err = self:execute(update_q, columns, self:_marshall(t))
local _, stmt_err = self:build_args_and_execute(update_q, columns, self:_marshall(t))
if stmt_err then
return nil, stmt_err
else
Expand All @@ -274,7 +285,7 @@ function BaseDao:find_by_primary_key(where_t)
end

local select_q, where_columns = query_builder.select(self._table, t_primary_key, self._column_family_details, nil, true)
local data, err = self:execute(select_q, where_columns, t_primary_key)
local data, err = self:build_args_and_execute(select_q, where_columns, t_primary_key)

-- Return the 1st and only element of the result set
if data and utils.table_size(data) > 0 then
Expand All @@ -291,13 +302,13 @@ end
-- 'WHERE' clause.
-- @param[type=table] where_t (Optional) columns/values table by which to find an entity.
-- @param[type=number] page_size Size of the page to retrieve (number of rows).
-- @param[type=string] paging_state Start page from given offset. See lua-cassandra's related **:execute()** option.
-- @param[type=string] paging_state Start page from given offset. See lua-cassandra's related `execute()` option.
-- @treturn table An array (of possible length 0) of entities as the result of the query
-- @treturn table An error if any
-- @treturn boolean A boolean indicating if the 'ALLOW FILTERING' clause was needed by the query
function BaseDao:find_by_keys(where_t, page_size, paging_state)
local select_q, where_columns, filtering = query_builder.select(self._table, where_t, self._column_family_details)
local res, err = self:execute(select_q, where_columns, where_t, {
local res, err = self:build_args_and_execute(select_q, where_columns, where_t, {
page_size = page_size,
paging_state = paging_state
})
Expand All @@ -314,7 +325,7 @@ end
-- @treturn boolean A boolean indicating if the 'ALLOW FILTERING' clause was needed by the query.
function BaseDao:count_by_keys(where_t, paging_state)
local count_q, where_columns, filtering = query_builder.count(self._table, where_t, self._column_family_details)
local res, err = self:execute(count_q, where_columns, where_t, {
local res, err = self:build_args_and_execute(count_q, where_columns, where_t, {
paging_state = paging_state
})
if err then
Expand Down Expand Up @@ -353,7 +364,7 @@ function BaseDao:delete(where_t)

local t_primary_key = extract_primary_key(where_t, self._primary_key, self._clustering_key)
local delete_q, where_columns = query_builder.delete(self._table, t_primary_key)
local results, err = self:execute(delete_q, where_columns, where_t)
local results, err = self:build_args_and_execute(delete_q, where_columns, where_t)
if err then
return false, err
end
Expand Down Expand Up @@ -435,81 +446,69 @@ function BaseDao:_unmarshall(t)
return t
end

local function page_iterator(self, session, query, args, query_options)
local iter = session:execute(query, args, query_options)
return function(query, previous_rows)
local rows, err, page = iter(query, previous_rows)
if rows == nil or err ~= nil then
session:set_keep_alive()
else
for i, row in ipairs(rows) do
rows[i] = self:_unmarshall(row)
end
end
return rows, err, page
end, query
end

--- Private methods.
-- For internal use in the base_dao itself or advanced usage in a child DAO.
-- @section private

--- Execute a query (internally).
-- This method should be called with the proper **args** formatting (as an array). See
-- **execute()** for building this parameter.
-- @see execute
--
-- @param query Plain string CQL query.
-- @param[type=table] args (Optional) Arguments to the query, as an array. Simply passed to lua-cassandra `execute()`.
-- @param[type=table] query_options (Optional) Options to give to lua-cassandra `execute()` query_options.
-- @param[type=string] keyspace (Optional) Override the keyspace for this query if specified.
-- @treturn table If the result consists of ROWS, a table with an array of unmarshalled rows and a `next_page` property if the results has a `paging_state`. If the result is of type "VOID", a boolean representing the success of the query. Otherwise, the raw result as given by lua-cassandra.
-- @treturn table An error if any during the execution.
function BaseDao:_execute(query, args, query_options, keyspace)
local options = self._factory:get_session_options()
if keyspace then
options.keyspace = keyspace
end
---
-- @local
-- Build the array of arguments to pass to lua-cassandra's `execute()` method.
-- Note:
-- Since this method only accepts an ordered list, we build this list from
-- the entity `t` and an (ordered) array of parameters for a query, taking
-- into account special cassandra values (uuid, timestamps, NULL).
-- @param[type=table] schema A schema with type properties to encode specific values.
-- @param[type=table] t Values to bind to a statement.
-- @param[type=table] parameters An ordered list of parameters.
-- @treturn table An ordered list of values to pass to lua-cassandra `execute()` args.
-- @treturn table Error Cassandra type validation errors
local function encode_cassandra_args(schema, t, args_keys)
local args_to_bind = {}
local errors

local session, err = cassandra.spawn_session(options)
if not session then
return nil, DaoError(err, constants.DATABASE_ERROR_TYPES.DATABASE)
end
for _, column in ipairs(args_keys) do
local schema_field = schema.fields[column]
local arg = t[column]

if query_options and query_options.auto_paging then
return page_iterator(self, session, query, args, query_options)
end
if schema_field.type == "id" and arg then
if validations.is_valid_uuid(arg) then
arg = cassandra.uuid(arg)
else
errors = utils.add_error(errors, column, arg.." is an invalid uuid")
end
elseif schema_field.type == "timestamp" and arg then
arg = cassandra.timestamp(arg)
elseif arg == nil then
arg = cassandra.unset
end

local results, err = session:execute(query, args, query_options)
if err then
err = DaoError(err, constants.DATABASE_ERROR_TYPES.DATABASE)
table.insert(args_to_bind, arg)
end

-- First, close the session (and underlying sockets)
session:set_keep_alive()

-- Parse result
if results and results.type == "ROWS" then
-- do we have more pages to fetch? if so, alias the paging_state
if results.meta.has_more_pages then
results.next_page = results.meta.paging_state
end

-- only the DAO needs those, it should be transparant in the application
results.meta = nil
results.type = nil
return args_to_bind, errors
end

for i, row in ipairs(results) do
results[i] = self:_unmarshall(row)
---
-- Bind a table of arguments to a query depending on the entity's schema,
-- and then execute the query via `execute()`.
-- @param[type=string] query The query to execute.
-- @param[type=table] columns A list of column names where each value indicates the column of the value at the same index in `args_to_bind`.
-- @param[type=table] args_to_bind Key/value table of arguments to bind.
-- @param[type=table] query_options Options to pass to lua-cassandra `execute()` query_options.
-- @return return values of `execute()`.
-- @see _execute
function BaseDao:build_args_and_execute(query, columns, args_to_bind, query_options)
-- Build args array if operation has some
local args
if columns and args_to_bind then
local errors
args, errors = encode_cassandra_args(self._schema, args_to_bind, columns)
if errors then
return nil, DaoError(errors, error_types.INVALID_TYPE)
end

return results, err
elseif results and results.type == "VOID" then
-- result is not a set of rows, let's return a boolean to indicate success
return err == nil, err
else
return results, err
end

return self:execute(query, args, query_options)
end

--- Perform "unique" check on a column.
Expand Down Expand Up @@ -599,7 +598,7 @@ function BaseDao:add_delete_hook(foreign_dao_name, foreign_column, parent_column
-- Rows need to be deleted by PRIMARY KEY, and we only have the value of the foreign key, hence we need
-- to retrieve all rows with the foreign key, and then delete them, identifier by their own primary key.
local select_q, columns = query_builder.select(foreign_dao._table, select_args, foreign_dao._column_family_details )
for rows, err in foreign_dao:execute(select_q, columns, select_args, {auto_paging = true}) do
for rows, err in foreign_dao:build_args_and_execute(select_q, columns, select_args, {auto_paging = true}) do
if err then
return false, err
end
Expand Down
Loading

0 comments on commit a2ac0a6

Please sign in to comment.