Future Blog Post
Published:
jupyter: kernelspec: display_name: Python 3 language: python name: python3 language_info: codemirror_mode: name: ipython version: 3 file_extension: .py mimetype: text/x-python name: python nbconvert_exporter: python pygments_lexer: ipython3 version: 3.11.9 nbformat: 4 nbformat_minor: 2 —
::: {.cell .markdown}
Conformal prediction
:::
::: {.cell .code execution_count=”1”}
import numpy as np
import matplotlib.pyplot as plt
:::
::: {.cell .markdown}
1. Generate data {#1-generate-data}
:::
::: {.cell .code execution_count=”2”}
def f(x):
return np.cos(1.5 * np.pi * x)
:::
::: {.cell .code execution_count=”3”}
x_plot = np.linspace(0, 1, 100)
y_plot = f(x_plot)
plt.plot(x_plot, y_plot)
plt.xlabel('X')
plt.ylabel('Y')
plt.grid(True)
plt.show()
::: {.output .display_data}
::: :::
::: {.cell .code execution_count=”4”}
n_points = 500
x = np.random.uniform(0, 1, n_points)
y = f(x) + x * np.random.normal(0, 1, n_points)
plt.plot(x_plot, y_plot)
plt.scatter(x,y, s=0.5, color="r")
plt.xlabel('X')
plt.ylabel('Y')
plt.grid(True)
plt.show()
::: {.output .display_data}
::: :::
::: {.cell .markdown}
2. Split conformal prediction {#2-split-conformal-prediction}
:::
::: {.cell .code execution_count=”5”}
idx = np.random.permutation(n_points)
n_half = int(np.floor(n_points/2))
idx_train, idx_cal = idx[:n_half], idx[n_half:2*n_half]
:::
::: {.cell .code execution_count=”6”}
x_train = x[idx_train]
y_train = y[idx_train]
x_cal = x[idx_cal]
y_cal = y[idx_cal]
:::
::: {.cell .markdown} Fit a regressor on training data. :::
::: {.cell .code execution_count=”7”}
reg = np.poly1d(np.polyfit(x_train, y_train, 3))
:::
::: {.cell .code execution_count=”8”}
plt.plot(x_plot, y_plot, label='true')
plt.plot(x_plot,reg(x_plot), label='prediction')
plt.xlabel('X')
plt.ylabel('Y')
plt.grid(True)
plt.legend()
plt.show()
::: {.output .display_data}
::: :::
::: {.cell .markdown} Now we compute scores. :::
::: {.cell .code execution_count=”9”}
plt.plot(x_plot,reg(x_plot), label='prediction')
plt.scatter(x_cal,y_cal,s=0.5,color="r", label="calibration data")
plt.xlabel('X')
plt.ylabel('Y')
plt.grid(True)
plt.legend()
plt.show()
::: {.output .display_data}
::: :::
::: {.cell .code execution_count=”10”}
predictions = reg(x_cal)
:::
::: {.cell .code execution_count=”11”}
scores = np.abs(y_cal - predictions)
:::
::: {.cell .code execution_count=”12”}
plt.hist(scores)
plt.show()
::: {.output .display_data}
::: :::
::: {.cell .markdown} Compute quantile. :::
::: {.cell .code execution_count=”13”}
alpha = 0.1
quantile = np.quantile(scores, np.ceil((1-alpha)*(len(x_cal)+1))/len(x_cal))
quantile
::: {.output .execute_result execution_count=”13”} 1.0659046161527235 ::: :::
::: {.cell .code execution_count=”14”}
plt.plot(x_plot,reg(x_plot), label='prediction')
plt.fill_between(x_plot, reg(x_plot) - quantile, reg(x_plot) + quantile, alpha=0.2)
plt.scatter(x_cal,y_cal,s=0.5,color="r", label="calibration data")
plt.scatter(x_train,y_train,s=0.5,color="black", label="training data")
plt.xlabel('X')
plt.ylabel('Y')
plt.grid(True)
plt.legend()
plt.show()
::: {.output .display_data}
::: :::
::: {.cell .code execution_count=”15”}
n_test = 1000
x_test = np.random.uniform(0, 1, n_test)
y_test = f(x_test) + x_test * np.random.normal(0, 1, n_test)
:::
::: {.cell .code execution_count=”16”}
plt.plot(x_plot,reg(x_plot), label='prediction')
plt.fill_between(x_plot, reg(x_plot) - quantile, reg(x_plot) + quantile, alpha=0.2)
# plt.scatter(x_cal,y_cal,s=0.5,color="r", label="calibration data")
# plt.scatter(x_train,y_train,s=0.5,color="black", label="training data")
plt.scatter(x_test,y_test,s=0.5,color="g", label="test data")
plt.xlabel('X')
plt.ylabel('Y')
plt.grid(True)
plt.legend()
plt.show()
::: {.output .display_data}
::: :::
::: {.cell .code execution_count=”17”}
coverage = np.sum((y_test >= reg(x_test)-quantile) & (y_test <= reg(x_test)+quantile)) / n_test
coverage
::: {.output .execute_result execution_count=”17”} 0.932 ::: :::
::: {.cell .markdown}
3. Improving local adaptivity {#3-improving-local-adaptivity}
:::
::: {.cell .markdown}
1. Quantile regression {#1-quantile-regression}
:::
::: {.cell .code execution_count=”18”}
def stack(x):
# use polynomials of degree 3 to predict quantiles
return np.column_stack((x, x**2, x**3))
:::
::: {.cell .code execution_count=”19”}
import statsmodels.api as sm
quantiles = [alpha/2, 1-alpha/2]
quantile_models = []
x_train_stack = stack(x_train)
for quantile in quantiles:
# Fit quantile regression model
quantile_model = sm.QuantReg(y_train, sm.add_constant(x_train_stack)).fit(q=quantile)
quantile_models.append(quantile_model)
for i, quantile_model in enumerate(quantile_models):
print(f"Quantile: {quantiles[i]}, Coefficients:")
print(quantile_model.summary())
::: {.output .stream .stdout} Quantile: 0.05, Coefficients: QuantReg Regression Results
============================================================================== Dep. Variable: y Pseudo R-squared: 0.4189 Model: QuantReg Bandwidth: 0.5274 Method: Least Squares Sparsity: 2.562 Date: Wed, 10 Apr 2024 No. Observations: 250 Time: 15:02:52 Df Residuals: 246 Df Model: 3 ============================================================================== coef std err t P>|t| [0.025 0.975] —————————————————————————— const 1.0091 0.216 4.678 0.000 0.584 1.434 x1 -2.8041 1.873 -1.497 0.136 -6.492 0.884 x2 -9.4769 4.257 -2.226 0.027 -17.861 -1.092 x3 10.0692 2.799 3.597 0.000 4.555 15.583 ============================================================================== Quantile: 0.95, Coefficients: QuantReg Regression Results
============================================================================== Dep. Variable: y Pseudo R-squared: 0.2129 Model: QuantReg Bandwidth: 0.5630 Method: Least Squares Sparsity: 2.482 Date: Wed, 10 Apr 2024 No. Observations: 250 Time: 15:02:52 Df Residuals: 246 Df Model: 3 ============================================================================== coef std err t P>|t| [0.025 0.975] —————————————————————————— const 0.9978 0.169 5.894 0.000 0.664 1.331 x1 1.6931 1.379 1.228 0.221 -1.023 4.409 x2 -12.5424 3.069 -4.086 0.000 -18.588 -6.497 x3 11.7889 1.928 6.115 0.000 7.991 15.586 ============================================================================== ::: :::
::: {.cell .code execution_count=”20”}
x_cal_stack = stack(x_cal)
pred_lower = quantile_models[0].predict(sm.add_constant(x_cal_stack))
pred_upper = quantile_models[1].predict(sm.add_constant(x_cal_stack))
:::
::: {.cell .code execution_count=”21”}
plt.plot(x_plot, y_plot, label='true function', color="black")
plt.scatter(x_cal, pred_lower, s=0.5, label='lower quantile prediction', color="g")
plt.scatter(x_cal, pred_upper, s=0.5, label='upper quantile prediction', color="r")
plt.xlabel('X')
plt.ylabel('Y')
plt.grid(True)
plt.legend()
plt.show()
::: {.output .display_data}
::: :::
::: {.cell .code execution_count=”22”}
scores = np.maximum(pred_lower - y_cal, y_cal - pred_upper)
:::
::: {.cell .code execution_count=”23”}
plt.hist(scores)
plt.show()
::: {.output .display_data}
::: :::
::: {.cell .code execution_count=”24”}
quantile = np.quantile(scores, np.ceil((1-alpha)*(len(x_cal)+1))/len(x_cal))
quantile
::: {.output .execute_result execution_count=”24”} 0.014922968585278196 ::: :::
::: {.cell .code execution_count=”25”}
plt.plot(x_plot,y_plot, label='true')
x_plot_stack = stack(x_plot)
q_lower_plot = quantile_models[0].predict(sm.add_constant(x_plot_stack))
q_upper_plot = quantile_models[1].predict(sm.add_constant(x_plot_stack))
plt.fill_between(x_plot, q_lower_plot - quantile, q_upper_plot + quantile, alpha=0.2)
# plt.scatter(x_cal,y_cal,s=0.5,color="r", label="calibration data")
# plt.scatter(x_train,y_train,s=0.5,color="black", label="training data")
plt.scatter(x_plot, q_lower_plot, s=0.5, label='lower quantile estimation', color="b")
plt.scatter(x_plot, q_upper_plot, s=0.5, label='upper quantile estimation', color="c")
plt.scatter(x_test,y_test,s=0.5,color="g", label="test data")
plt.xlabel('X')
plt.ylabel('Y')
plt.grid(True)
plt.legend()
plt.show()
::: {.output .display_data}
::: :::
::: {.cell .code execution_count=”26”}
x_test_stack = stack(x_test)
pred_lower_test = quantile_models[0].predict(sm.add_constant(x_test_stack))
pred_upper_test = quantile_models[1].predict(sm.add_constant(x_test_stack))
:::
::: {.cell .code execution_count=”27”}
coverage = np.sum((y_test >= pred_lower_test-quantile) & (y_test <= pred_upper_test+quantile)) / n_test
coverage
::: {.output .execute_result execution_count=”27”} 0.927 ::: :::
