Future Blog Post
Published:
Conformal prediction¶
import numpy as np
import matplotlib.pyplot as plt
1. Generate data¶
def f(x):
return np.cos(1.5 * np.pi * x)
x_plot = np.linspace(0, 1, 100)
y_plot = f(x_plot)
plt.plot(x_plot, y_plot)
plt.xlabel('X')
plt.ylabel('Y')
plt.grid(True)
plt.show()
n_points = 500
x = np.random.uniform(0, 1, n_points)
y = f(x) + x * np.random.normal(0, 1, n_points)
plt.plot(x_plot, y_plot)
plt.scatter(x,y, s=0.5, color="r")
plt.xlabel('X')
plt.ylabel('Y')
plt.grid(True)
plt.show()
2. Split conformal prediction¶
idx = np.random.permutation(n_points)
n_half = int(np.floor(n_points/2))
idx_train, idx_cal = idx[:n_half], idx[n_half:2*n_half]
x_train = x[idx_train]
y_train = y[idx_train]
x_cal = x[idx_cal]
y_cal = y[idx_cal]
Fit a regressor on training data.
reg = np.poly1d(np.polyfit(x_train, y_train, 3))
plt.plot(x_plot, y_plot, label='true')
plt.plot(x_plot,reg(x_plot), label='prediction')
plt.xlabel('X')
plt.ylabel('Y')
plt.grid(True)
plt.legend()
plt.show()
Now we compute scores.
plt.plot(x_plot,reg(x_plot), label='prediction')
plt.scatter(x_cal,y_cal,s=0.5,color="r", label="calibration data")
plt.xlabel('X')
plt.ylabel('Y')
plt.grid(True)
plt.legend()
plt.show()
predictions = reg(x_cal)
scores = np.abs(y_cal - predictions)
plt.hist(scores)
plt.show()
Compute quantile.
alpha = 0.1
quantile = np.quantile(scores, np.ceil((1-alpha)*(len(x_cal)+1))/len(x_cal))
quantile
1.0659046161527235
plt.plot(x_plot,reg(x_plot), label='prediction')
plt.fill_between(x_plot, reg(x_plot) - quantile, reg(x_plot) + quantile, alpha=0.2)
plt.scatter(x_cal,y_cal,s=0.5,color="r", label="calibration data")
plt.scatter(x_train,y_train,s=0.5,color="black", label="training data")
plt.xlabel('X')
plt.ylabel('Y')
plt.grid(True)
plt.legend()
plt.show()
n_test = 1000
x_test = np.random.uniform(0, 1, n_test)
y_test = f(x_test) + x_test * np.random.normal(0, 1, n_test)
plt.plot(x_plot,reg(x_plot), label='prediction')
plt.fill_between(x_plot, reg(x_plot) - quantile, reg(x_plot) + quantile, alpha=0.2)
# plt.scatter(x_cal,y_cal,s=0.5,color="r", label="calibration data")
# plt.scatter(x_train,y_train,s=0.5,color="black", label="training data")
plt.scatter(x_test,y_test,s=0.5,color="g", label="test data")
plt.xlabel('X')
plt.ylabel('Y')
plt.grid(True)
plt.legend()
plt.show()
coverage = np.sum((y_test >= reg(x_test)-quantile) & (y_test <= reg(x_test)+quantile)) / n_test
coverage
0.932
3. Improving local adaptivity¶
1. Quantile regression¶
def stack(x):
# use polynomials of degree 3 to predict quantiles
return np.column_stack((x, x**2, x**3))
import statsmodels.api as sm
quantiles = [alpha/2, 1-alpha/2]
quantile_models = []
x_train_stack = stack(x_train)
for quantile in quantiles:
# Fit quantile regression model
quantile_model = sm.QuantReg(y_train, sm.add_constant(x_train_stack)).fit(q=quantile)
quantile_models.append(quantile_model)
for i, quantile_model in enumerate(quantile_models):
print(f"Quantile: {quantiles[i]}, Coefficients:")
print(quantile_model.summary())
Quantile: 0.05, Coefficients:
QuantReg Regression Results
==============================================================================
Dep. Variable: y Pseudo R-squared: 0.4189
Model: QuantReg Bandwidth: 0.5274
Method: Least Squares Sparsity: 2.562
Date: Wed, 10 Apr 2024 No. Observations: 250
Time: 15:02:52 Df Residuals: 246
Df Model: 3
==============================================================================
coef std err t P>|t| [0.025 0.975]
------------------------------------------------------------------------------
const 1.0091 0.216 4.678 0.000 0.584 1.434
x1 -2.8041 1.873 -1.497 0.136 -6.492 0.884
x2 -9.4769 4.257 -2.226 0.027 -17.861 -1.092
x3 10.0692 2.799 3.597 0.000 4.555 15.583
==============================================================================
Quantile: 0.95, Coefficients:
QuantReg Regression Results
==============================================================================
Dep. Variable: y Pseudo R-squared: 0.2129
Model: QuantReg Bandwidth: 0.5630
Method: Least Squares Sparsity: 2.482
Date: Wed, 10 Apr 2024 No. Observations: 250
Time: 15:02:52 Df Residuals: 246
Df Model: 3
==============================================================================
coef std err t P>|t| [0.025 0.975]
------------------------------------------------------------------------------
const 0.9978 0.169 5.894 0.000 0.664 1.331
x1 1.6931 1.379 1.228 0.221 -1.023 4.409
x2 -12.5424 3.069 -4.086 0.000 -18.588 -6.497
x3 11.7889 1.928 6.115 0.000 7.991 15.586
==============================================================================
x_cal_stack = stack(x_cal)
pred_lower = quantile_models[0].predict(sm.add_constant(x_cal_stack))
pred_upper = quantile_models[1].predict(sm.add_constant(x_cal_stack))
plt.plot(x_plot, y_plot, label='true function', color="black")
plt.scatter(x_cal, pred_lower, s=0.5, label='lower quantile prediction', color="g")
plt.scatter(x_cal, pred_upper, s=0.5, label='upper quantile prediction', color="r")
plt.xlabel('X')
plt.ylabel('Y')
plt.grid(True)
plt.legend()
plt.show()
scores = np.maximum(pred_lower - y_cal, y_cal - pred_upper)
plt.hist(scores)
plt.show()
quantile = np.quantile(scores, np.ceil((1-alpha)*(len(x_cal)+1))/len(x_cal))
quantile
0.014922968585278196
plt.plot(x_plot,y_plot, label='true')
x_plot_stack = stack(x_plot)
q_lower_plot = quantile_models[0].predict(sm.add_constant(x_plot_stack))
q_upper_plot = quantile_models[1].predict(sm.add_constant(x_plot_stack))
plt.fill_between(x_plot, q_lower_plot - quantile, q_upper_plot + quantile, alpha=0.2)
# plt.scatter(x_cal,y_cal,s=0.5,color="r", label="calibration data")
# plt.scatter(x_train,y_train,s=0.5,color="black", label="training data")
plt.scatter(x_plot, q_lower_plot, s=0.5, label='lower quantile estimation', color="b")
plt.scatter(x_plot, q_upper_plot, s=0.5, label='upper quantile estimation', color="c")
plt.scatter(x_test,y_test,s=0.5,color="g", label="test data")
plt.xlabel('X')
plt.ylabel('Y')
plt.grid(True)
plt.legend()
plt.show()
x_test_stack = stack(x_test)
pred_lower_test = quantile_models[0].predict(sm.add_constant(x_test_stack))
pred_upper_test = quantile_models[1].predict(sm.add_constant(x_test_stack))
coverage = np.sum((y_test >= pred_lower_test-quantile) & (y_test <= pred_upper_test+quantile)) / n_test
coverage
0.927
