-
Notifications
You must be signed in to change notification settings - Fork 18
Expand file tree
/
Copy pathutils.py
More file actions
140 lines (118 loc) · 4.29 KB
/
utils.py
File metadata and controls
140 lines (118 loc) · 4.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
from __future__ import annotations
import numpy as np
EPS = 1e-12
MIN_REQUIRED_SAMPLES = 7200
def rolling_std_fast(x: np.ndarray, window: int) -> np.ndarray:
x = np.asarray(x, dtype=np.float64)
n = int(x.shape[0])
w = int(window)
if n < w:
return np.full(0, np.nan)
mu = np.mean(x)
xc = x - mu
c1 = np.concatenate(([0.0], np.cumsum(xc)))
c2 = np.concatenate(([0.0], np.cumsum(xc * xc)))
s1 = c1[w:] - c1[:-w]
s2 = c2[w:] - c2[:-w]
var = (s2 - (s1 * s1) / w) / max(w - 1, 1)
return np.sqrt(np.maximum(var, 0.0))
def sigma_from_price(
price: np.ndarray,
*,
return_horizon_steps: int,
vol_window: int,
eps: float = EPS,
) -> np.ndarray:
"""
Rolling sigma at time t computed from *past* log returns over `return_horizon_steps`.
Returns an array of length T with NaN where sigma is not yet defined.
"""
price = np.asarray(price, dtype=float)
if price.ndim != 1:
raise ValueError("price_data must be 1-D array")
h = int(return_horizon_steps)
if h <= 0:
raise ValueError("return_horizon_steps must be positive")
T = int(price.shape[0])
if T <= h:
return np.full(T, np.nan)
r_fwd = np.log(price[h:] + eps) - np.log(price[:-h] + eps)
n = int(r_fwd.shape[0])
sig_fwd = np.full(n, np.nan)
sig_raw = rolling_std_fast(r_fwd, int(vol_window))
if sig_raw.size > 0:
sig_fwd[int(vol_window) - 1 :] = sig_raw
sigma_t = np.full(T, np.nan)
idx_t = np.arange(h, T, dtype=int)
sigma_t[idx_t] = sig_fwd[idx_t - h]
return sigma_t
def make_bins_from_price(
price: np.ndarray,
*,
horizon_steps: int = 1,
sigma_return_horizon_steps: int | None = None,
vol_window: int = 7200,
eps: float = EPS,
) -> tuple[np.ndarray, np.ndarray]:
"""
Returns:
y: labels in 0..4 for valid indices
valid_idx: start indices t where sigma(t) is defined and >0
"""
price = np.asarray(price, dtype=float)
if price.ndim != 1:
raise ValueError("price_data must be 1-D array")
T = int(price.shape[0])
horizon = int(horizon_steps)
if horizon <= 0:
raise ValueError("horizon_steps must be positive")
sig_h = int(horizon if sigma_return_horizon_steps is None else sigma_return_horizon_steps)
if sig_h <= 0:
raise ValueError("sigma_return_horizon_steps must be positive")
if T <= horizon:
return np.zeros(0, dtype=int), np.zeros(0, dtype=int)
r = np.log(price[horizon:] + eps) - np.log(price[:-horizon] + eps)
sigma_t = sigma_from_price(price, return_horizon_steps=sig_h, vol_window=int(vol_window), eps=eps)
sigma_start = sigma_t[: r.shape[0]]
idx_all = np.arange(r.shape[0], dtype=int)
valid_mask = np.isfinite(sigma_start) & (sigma_start > 0.0)
valid_idx = idx_all[valid_mask]
if valid_idx.size == 0:
return np.zeros(0, dtype=int), np.zeros(0, dtype=int)
z = r[valid_mask] / (sigma_start[valid_mask] + eps)
y = np.zeros_like(z, dtype=int)
y[z <= -2.0] = 0
y[(z > -2.0) & (z < -1.0)] = 1
y[(z >= -1.0) & (z <= 1.0)] = 2
y[(z > 1.0) & (z < 2.0)] = 3
y[z >= 2.0] = 4
return y, valid_idx
def logit(p: np.ndarray) -> np.ndarray:
p = np.clip(p, EPS, 1.0 - EPS)
return np.log(p) - np.log(1.0 - p)
def recent_mass_weights(t: np.ndarray, *, recent_samples: int, recent_mass: float) -> np.ndarray:
"""
Piecewise-constant sample weights such that the most recent `recent_samples` (by t)
receive exactly `recent_mass` of the total weight mass.
"""
t = np.asarray(t, dtype=float)
if t.size == 0:
return np.ones(0, dtype=np.float32)
if int(recent_samples) <= 0:
raise ValueError("recent_samples must be positive")
if not (0.0 < float(recent_mass) < 1.0):
raise ValueError("recent_mass must be in (0,1)")
t_max = float(np.max(t))
cutoff = t_max - float(int(recent_samples) - 1)
recent = t >= cutoff
n_r = int(np.sum(recent))
n_o = int(t.size - n_r)
if n_r == 0 or n_o == 0:
w = np.ones(t.size, dtype=np.float32)
return w
w_old = 1.0
w_recent = float(recent_mass) * float(n_o) / ((1.0 - float(recent_mass)) * float(n_r))
w = np.full(t.size, w_old, dtype=np.float32)
w[recent] = np.float32(w_recent)
w = w * (t.size / float(np.sum(w)))
return w