-
Notifications
You must be signed in to change notification settings - Fork 805
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(sql): Optimize executions #4804
base: master
Are you sure you want to change the base?
Changes from all commits
d8d3799
213b308
4aee645
471f2f9
77d0b12
3b3894b
ddf724e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -53,8 +53,6 @@ import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository.Execu | |
import com.netflix.spinnaker.orca.pipeline.persistence.UnpausablePipelineException | ||
import com.netflix.spinnaker.orca.pipeline.persistence.UnresumablePipelineException | ||
import de.huxhorn.sulky.ulid.SpinULID | ||
import java.lang.System.currentTimeMillis | ||
import java.security.SecureRandom | ||
import java.time.Duration | ||
import org.jooq.DSLContext | ||
import org.jooq.DatePart | ||
|
@@ -80,7 +78,33 @@ import org.jooq.impl.DSL.value | |
import org.slf4j.LoggerFactory | ||
import rx.Observable | ||
import java.io.ByteArrayOutputStream | ||
import java.lang.System.currentTimeMillis | ||
import java.nio.charset.StandardCharsets | ||
import java.security.SecureRandom | ||
import java.util.stream.Collectors.toList | ||
import kotlin.collections.Collection | ||
import kotlin.collections.Iterable | ||
import kotlin.collections.Iterator | ||
import kotlin.collections.List | ||
import kotlin.collections.Map | ||
import kotlin.collections.MutableList | ||
import kotlin.collections.chunked | ||
import kotlin.collections.distinct | ||
import kotlin.collections.firstOrNull | ||
import kotlin.collections.forEach | ||
import kotlin.collections.isEmpty | ||
import kotlin.collections.isNotEmpty | ||
import kotlin.collections.listOf | ||
import kotlin.collections.map | ||
import kotlin.collections.mapOf | ||
import kotlin.collections.mutableListOf | ||
import kotlin.collections.mutableMapOf | ||
import kotlin.collections.plus | ||
import kotlin.collections.set | ||
import kotlin.collections.toList | ||
import kotlin.collections.toMutableList | ||
import kotlin.collections.toMutableMap | ||
import kotlin.collections.toTypedArray | ||
|
||
/** | ||
* A generic SQL [ExecutionRepository]. | ||
|
@@ -427,6 +451,129 @@ class SqlExecutionRepository( | |
) | ||
} | ||
|
||
override fun retrievePipelineConfigIdsForApplication(application: String): List<String> = | ||
withPool(poolName) { | ||
return jooq.selectDistinct(field("config_id")) | ||
.from(PIPELINE.tableName) | ||
.where(field("application").eq(application)) | ||
.fetch(0, String::class.java) | ||
} | ||
|
||
/** | ||
* this function supports the following ExecutionCriteria currently: | ||
* 'limit', a.k.a page size and | ||
* 'statuses'. | ||
* | ||
* It executes the following query to determine how many pipeline executions exist that satisfy the above | ||
* ExecutionCriteria. It then returns a list of all these execution ids. | ||
* | ||
* It does this by executing the following query: | ||
* - If the execution criteria does not contain any statuses: | ||
* SELECT config_id, id | ||
FROM pipelines force index (`pipeline_application_idx`) | ||
WHERE application = "myapp" | ||
ORDER BY | ||
config_id; | ||
* - If the execution criteria contains statuses: | ||
* SELECT config_id, id | ||
FROM pipelines force index (`pipeline_application_status_starttime_idx`) | ||
WHERE ( | ||
application = "myapp" and | ||
status in ("status1", "status2) | ||
) | ||
ORDER BY | ||
config_id; | ||
|
||
* It then applies the limit execution criteria on the result set obtained above. We observed load issues in the db | ||
jasonmcintosh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* when running a query where the limit was calculated in the query itself. Therefore, we are moving that logic to | ||
* the code below to ease the burden on the db in such circumstances. | ||
*/ | ||
override fun retrieveAndFilterPipelineExecutionIdsForApplication( | ||
application: String, | ||
pipelineConfigIds: List<String>, | ||
criteria: ExecutionCriteria | ||
): List<String> { | ||
|
||
// baseQueryPredicate for the flow where there are no statuses in the execution criteria | ||
var baseQueryPredicate = field("application").eq(application) | ||
.and(field("config_id").`in`(*pipelineConfigIds.toTypedArray())) | ||
|
||
var table = if (jooq.dialect() == SQLDialect.MYSQL) PIPELINE.tableName.forceIndex("pipeline_application_idx") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Question: DO we really want to force an index, or is another index on the combination needed? OR perhaps merging a couple of these indexes. NOT sure best solution but... we have a number of indexes with some of this data and wondering if we couldn't do this a touch cleaner. I'd EXPECT the optimizer to handle MOST of this without the need to force an index. |
||
else PIPELINE.tableName | ||
// baseQueryPredicate for the flow with statuses | ||
if (criteria.statuses.isNotEmpty() && criteria.statuses.size != ExecutionStatus.values().size) { | ||
val statusStrings = criteria.statuses.map { it.toString() } | ||
baseQueryPredicate = baseQueryPredicate | ||
.and(field("status").`in`(*statusStrings.toTypedArray())) | ||
|
||
table = if (jooq.dialect() == SQLDialect.MYSQL) PIPELINE.tableName.forceIndex("pipeline_application_status_starttime_idx") | ||
else PIPELINE.tableName | ||
} | ||
|
||
val finalResult: MutableList<String> = mutableListOf() | ||
|
||
withPool(poolName) { | ||
val baseQuery = jooq.select(field("config_id"), field("id")) | ||
.from(table) | ||
.where(baseQueryPredicate) | ||
.orderBy(field("config_id")) | ||
.fetch().intoGroups("config_id", "id") | ||
|
||
baseQuery.forEach { | ||
val count = it.value.size | ||
if (criteria.pageSize < count) { | ||
finalResult.addAll(it.value | ||
.stream() | ||
.skip((count - criteria.pageSize).toLong()) | ||
.collect(toList()) as List<String> | ||
) | ||
} else { | ||
finalResult.addAll(it.value as List<String>) | ||
} | ||
} | ||
} | ||
return finalResult | ||
} | ||
|
||
/** | ||
* It executes the following query to get execution details for n executions at a time in a specific application | ||
* | ||
* SELECT id, body, compressed_body, compression_type, `partition` | ||
FROM pipelines force index (`pipeline_application_idx`) | ||
left outer join | ||
pipelines_compressed_executions | ||
using (`id`) | ||
WHERE ( | ||
application = "<myapp>" and | ||
id in ('id1', 'id2', 'id3') | ||
); | ||
* | ||
* it then gets all the stage information for all the executions returned from the above query. | ||
*/ | ||
override fun retrievePipelineExecutionDetailsForApplication( | ||
application: String, | ||
pipelineExecutions: List<String>, | ||
queryTimeoutSeconds: Int | ||
): Collection<PipelineExecution> { | ||
withPool(poolName) { | ||
val baseQuery = jooq.select(selectExecutionFields(compressionProperties)) | ||
.from( | ||
if (jooq.dialect() == SQLDialect.MYSQL) PIPELINE.tableName.forceIndex("pipeline_application_idx") | ||
else PIPELINE.tableName | ||
) | ||
.leftOuterJoin(PIPELINE.tableName.compressedExecTable).using(field("id")) | ||
.where( | ||
field("application").eq(application) | ||
.and(field("id").`in`(*pipelineExecutions.toTypedArray())) | ||
) | ||
.queryTimeout(queryTimeoutSeconds) // add an explicit timeout so that the query doesn't run forever | ||
.fetch() | ||
|
||
log.debug("getting stage information for all the executions found so far") | ||
return ExecutionMapper(mapper, stageReadSize,compressionProperties, pipelineRefEnabled).map(baseQuery.intoResultSet(), jooq) | ||
} | ||
} | ||
|
||
override fun retrievePipelinesForPipelineConfigId( | ||
pipelineConfigId: String, | ||
criteria: ExecutionCriteria | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MINOR: BUT ... this ordering DOES matter on the selection criteria and don't think we have a test for this (aka primary MUST come first or it'd select previous data potentially). Just an observation that this is missing a test .. but not sure how "critical" that test really is.