-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathMulti-Regression Strategy
210 lines (173 loc) · 8.92 KB
/
Multi-Regression Strategy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
// This Pine Script™ code is subject to the terms of the Mozilla Public License 2.0 at https://mozilla.org/MPL/2.0/
// © gotbeatz26107
//@version=5
strategy("Multi-Regression Strategy", overlay = true, process_orders_on_close = true,
initial_capital = 1000, commission_type = strategy.commission.percent,
commission_value = 0.02, default_qty_type = strategy.percent_of_equity,
default_qty_value = 20, slippage = 2, use_bar_magnifier = true)
// User Inputs
g_regression = "Regression Settings"
regressionType = input.string("Linear", "Regression Type", options = ["Linear", "Ridge", "LOESS"], group = g_regression, tooltip = "Choose the type of regression to use")
length = input.int(90, "Length", minval = 5, group = g_regression, tooltip = "Number of bars used for regression calculation")
g_ridge = "Ridge Regression Settings"
lambda = input.float(0.2, "Ridge Lambda", minval = 0.0, step = 0.1, group = g_ridge, tooltip = "Regularization parameter for Ridge regression")
g_loess = "LOESS Settings"
point_span = input.int(30, "LOESS Point Span", minval = 2, group = g_loess, tooltip = "Number of nearest neighbors used in local regression")
adaptive = input.bool(false, "Adaptive", group = g_loess, tooltip = "Use adaptive weights based on the data density")
exp = input.float(3.0, "Exponent", minval = 0.1, group = g_loess, tooltip = "Exponent for the adaptive weighting function")
g_risk = "Risk Management"
use_sl = input.bool(true, "Use Stop Loss", group = g_risk, tooltip = "Enable stop loss")
use_tp = input.bool(true, "Use Take Profit", group = g_risk, tooltip = "Enable take profit")
risk_type = input.string("ATR", "Risk Measure", options = ["ATR", "StdDev", "BB Width", "KC Width",
"Chaikin Volatility", "Historical Volatility",
"Ulcer Index", "ATRP", "KAMA Efficiency Ratio"],
group = g_risk,
tooltip = "Choose the volatility measure for risk calculation")
risk_mult = input.float(1.0, "Risk Multiplier", minval = 0.1, step = 0.1, group = g_risk,
tooltip = "Multiplier for the risk measure to set stop loss and take profit levels")
// Helper Functions
square(float value) => math.pow(value, 2.0)
cube(float value) => math.pow(value, 3.0)
tricubic(float value) => value > 1.0 ? 0.0 : cube(1.0 - cube(value))
polyweight(float value, float exponent) => value > 1.0 ? 0.0 : math.pow(1.0 - math.pow(value, exponent), exponent)
// Regression Functions
linearRegression() => ta.linreg(close, length, 0)
ridgeRegression(float lambda) =>
var float[] x = array.new_float(length)
var float[] y = array.new_float(length)
for i = 0 to length - 1
array.set(x, i, float(i) / (length - 1))
array.set(y, i, close[length - 1 - i])
float sum_x = 0.0, sum_y = 0.0, sum_xx = 0.0, sum_xy = 0.0
for i = 0 to length - 1
xi = array.get(x, i)
yi = array.get(y, i)
sum_x += xi
sum_y += yi
sum_xx += xi * xi
sum_xy += xi * yi
float n = float(length)
float denominator = (n * sum_xx - sum_x * sum_x + lambda * n)
float beta = (n * sum_xy - sum_x * sum_y) / denominator
float alpha = (sum_y - beta * sum_x) / n
alpha + beta * (float(length - 1) / (length - 1))
loess(int[] sample_x, float[] sample_y, int point_span, bool is_adaptive, float exponent) =>
int _size_x = array.size(sample_x) - 1
int _size_y = array.size(sample_y) - 1
float[] _yloess = array.new_float(_size_x + 1)
for _i = 0 to _size_x
int _xi = array.get(sample_x, _i)
int _start = math.max(0, _i - point_span)
int _end = math.min(_i + point_span, _size_x)
float _sum_w = 0.0, _sum_x = 0.0, _sum_x2 = 0.0, _sum_y = 0.0, _sum_xy = 0.0
for _s = _start to _end
int _xs = array.get(sample_x, _s)
float _ys = array.get(sample_y, _s)
int _ds = math.abs(_xs - _xi)
float _ws = is_adaptive ? polyweight(float(_ds) / float(point_span), exponent) : tricubic(float(_ds) / float(point_span))
float _wxs = _ws * float(_xs)
_sum_w += _ws
_sum_x += _wxs
_sum_x2 += _wxs * float(_xs)
_sum_y += _ws * _ys
_sum_xy += _wxs * _ys
float _mean_x = _sum_x / _sum_w
float _mean_y = _sum_y / _sum_w
float _mean_xy = _sum_xy / _sum_w
float _mean_x2 = _sum_x2 / _sum_w
float _mx2xx = _mean_x2 - square(_mean_x)
float _beta = _mx2xx == 0.0 ? 0.0 : (_mean_xy - _mean_x * _mean_y) / _mx2xx
float _alpha = _mean_y - _beta * _mean_x
array.set(_yloess, _i, _alpha + _beta * float(_xi))
array.get(_yloess, _size_y)
regression(string _type) =>
switch _type
"Linear" => linearRegression()
"Ridge" => ridgeRegression(lambda)
"LOESS" =>
int[] x = array.new_int()
float[] y = array.new_float()
for i = 0 to length - 1
array.push(x, i)
array.push(y, close[length - 1 - i])
loess(x, y, point_span, adaptive, exp)
// Calculate regression value
float regValue = regression(regressionType)
// Risk Management Calculations
atr = ta.atr(length)
stdev = ta.stdev(close, length)
// Bollinger Bands Width
bb_mult = 2.0
basis = ta.sma(close, length)
dev = ta.stdev(close, length)
upper = basis + bb_mult * dev
lower = basis - bb_mult * dev
bb_width = (upper - lower) / basis
// Keltner Channels Width
kc_mult = 1.5
kc_basis = ta.ema(close, length)
kc_range = ta.atr(length)
kc_upper = kc_basis + kc_mult * kc_range
kc_lower = kc_basis - kc_mult * kc_range
kc_width = (kc_upper - kc_lower) / kc_basis
// Chaikin Volatility
chaikin_vol = ta.ema(high - low, length)
chaikin_vol_norm = chaikin_vol / close
// Historical Volatility
returns = math.log(close / close[1])
hist_vol = ta.stdev(returns, length) * math.sqrt(252)
// Ulcer Index
f_ulcer(src, len) =>
sum = 0.0
max_price = src
for i = 0 to len - 1
max_price := math.max(max_price, src[i])
percent_drawdown = (src[i] - max_price) / max_price * 100
sum += math.pow(percent_drawdown, 2)
math.sqrt(sum / len)
ulcer_index = f_ulcer(close, length)
// ATRP
atrp = ta.atr(length) / close
// KAMA Efficiency Ratio
change = math.abs(close - close[length])
volatility = math.sum(math.abs(close - close[1]), length)
er = change / volatility
// Select risk measure and normalize
risk_distance_percent = switch risk_type
"ATR" => atr / close
"StdDev" => stdev / close
"BB Width" => bb_width
"KC Width" => kc_width
"Chaikin Volatility" => chaikin_vol_norm
"Historical Volatility" => hist_vol
"Ulcer Index" => ulcer_index / 100
"ATRP" => atrp
"KAMA Efficiency Ratio" => er
// Apply risk multiplier
risk_distance_percent *= risk_mult
// Calculate bounds as a percentage of the current price
float upper_bound = regValue * (1 + risk_distance_percent)
float lower_bound = regValue * (1 - risk_distance_percent)
// Plotting
plot(regValue, title = "Regression Value", color = color.white, linewidth = 2)
plot(upper_bound, title = "Upper Bound", color = color.teal, linewidth = 1)
plot(lower_bound, title = "Lower Bound", color = color.red, linewidth = 1)
// Trading Logic
longCondition = ta.crossover(close, regValue)
shortCondition = ta.crossunder(close, regValue)
// Execute trades
if longCondition
strategy.close("Short")
strategy.entry("Long", strategy.long)
else if shortCondition
strategy.close("Long")
strategy.entry("Short", strategy.short)
// Risk management
if use_sl
strategy.exit("Long SL", "Long", stop = lower_bound)
strategy.exit("Short SL", "Short", stop = upper_bound)
if use_tp
if ta.crossunder(close, upper_bound)
strategy.close('Long')
if ta.crossover(close, lower_bound)
strategy.close('Short')