222 lines
9.5 KiB
Python
222 lines
9.5 KiB
Python
import numpy as np
|
||
import pandas as pd
|
||
|
||
class LinearRegression:
|
||
'''
|
||
Constructor for the linear regression with mini‑batch stochastic gradient descent. It uses learning rate,
|
||
iteration number, batch size, bias and verbose. It also initializes the weight, mean and standard deviation.
|
||
'''
|
||
def __init__(self, lr, n_iter, batch_size, add_bias, verbose): # add degree as value for the polynomial features
|
||
self.lr = lr # learning rate
|
||
self.n_iter = n_iter # number of gradient-descent iterations
|
||
self.batch_size = batch_size # row number for each gradient step
|
||
self.add_bias = add_bias # bias to prepend a column of ones (the intercept term)
|
||
self.verbose = verbose # if true, prints the mean‑squared error every 100 iterations
|
||
#self.degree = degree # degree for polynomial expansion (non-linear base)
|
||
self.w = None # weight/coefficient
|
||
self.mean = None # used for standardisation
|
||
self.std = None # standard deviation
|
||
|
||
def prepare(self, x: pd.DataFrame) -> pd.DataFrame:
|
||
'''
|
||
Preparation method to ensure X is a float DataFrame, add a bias if it is true and standardise the X.
|
||
'''
|
||
x = x.copy()
|
||
x = x.astype('float64')
|
||
|
||
if self.mean is None: # standardisation
|
||
self.mean = x.mean()
|
||
self.std = x.std(ddof=0)
|
||
self.std.replace(0, 1, inplace=True) # guard against division by zero
|
||
|
||
x = (x - self.mean) / self.std # standardisation formula
|
||
|
||
if self.add_bias: # adding bias
|
||
x['bias'] = 1.0
|
||
|
||
'''
|
||
# applying polynomial transformation for non-linear bases
|
||
if self.degree > 1:
|
||
poly_features = pd.DataFrame()
|
||
# create polynomial features of the given degree
|
||
for col in x.columns:
|
||
for d in range(2, self.degree + 1):
|
||
poly_features[f"{col}^{d}"] = x[col] ** d
|
||
x = pd.concat([x, poly_features], axis=1)
|
||
'''
|
||
|
||
return x
|
||
|
||
|
||
def fit(self, x: pd.DataFrame, y: pd.Series) -> "LinearRegression":
|
||
'''
|
||
Fit method to fit X and Y datas through pandas and train the linear model by gradient descent.
|
||
It uses pandas DataFrame for the X and Series for the Y. For the n iterations, it returns batch X and Y values
|
||
from random subset of indices calculates gradient from differences between predicted Y and batch Y values and
|
||
calculates the weight. If verbose it prints the mean square error for each 100 iterations.
|
||
'''
|
||
x = self.prepare(x) # standardisation and adding bias through prepare method
|
||
y = pd.Series(y).astype('float64') # check if Y is series.
|
||
|
||
x_np = x.to_numpy()
|
||
y_np = y.to_numpy()
|
||
|
||
n_samples, n_features = x_np.shape # n samples
|
||
w_np = np.zeros(n_features) # initialize weight as zero
|
||
batch_size = self.batch_size
|
||
# defines n samples as batch size if size is none or bigger than n samples
|
||
if batch_size is None or batch_size >= n_samples:
|
||
batch_size = n_samples
|
||
|
||
# number of batches per iteration
|
||
n_batches = int(np.ceil(n_samples / batch_size))
|
||
|
||
for epoch in range(1, self.n_iter + 1):
|
||
shuffled_idx = np.random.permutation(n_samples) # random permutation of the indices
|
||
for b in range(n_batches):
|
||
start = b * batch_size
|
||
end = min(start + batch_size, n_samples)
|
||
idx = shuffled_idx[start:end]
|
||
|
||
x_batch = x_np[idx]
|
||
y_batch = y_np[idx]
|
||
# it returns X and Y batch values from a randomly permuted indices from start to end
|
||
|
||
y_pred = x_batch.dot(w_np)
|
||
# makes Y prediction value for X batch value by multiplying X and weight vectors.
|
||
|
||
error = y_batch - y_pred # error is difference between Y batch value and Y prediction value
|
||
grad = -2 * x_batch.T.dot(error) / batch_size # for linear regression -2*X^T*(error)
|
||
# gradient is calculated by multiplication of error, transposed X batch value and -2 divided by batch size
|
||
|
||
w_np -= self.lr * grad # weight is decreased by multiplication of learning rate and gradient
|
||
|
||
# if verbose, it calculates the mean squared error every 100 iterations and displays it
|
||
if self.verbose and epoch % 100 == 0:
|
||
y_full_pred = x.dot(w_np)
|
||
mse = ((y_np - y_full_pred) ** 2).mean()
|
||
mae = float(np.mean(np.abs(y_np - y_full_pred)))
|
||
rmse = (((y_np - y_full_pred) ** 2).mean()) ** 0.5
|
||
print(f"Iter {epoch:5d} | MSE: {mse:.6f} | MAE: {mae:.6f} | RMSE: {rmse:.6f}")
|
||
|
||
|
||
self.w = pd.Series(w_np, index=x.columns) # store weights back as a pandas series
|
||
return self
|
||
|
||
def predict(self, x: pd.DataFrame) -> pd.Series:
|
||
'''
|
||
Predict method is used to test trained data to do Y prediction by multiplying X and weight vectors.
|
||
'''
|
||
if self.w is None: # if weight is empty, throw error
|
||
raise RuntimeError("Model is not fitted yet. Call `fit` first.")
|
||
|
||
x = self.prepare(x) # standardisation and adding bias through prepare method
|
||
return x.dot(self.w)
|
||
|
||
def score(self, x: pd.DataFrame, y: pd.Series) -> float:
|
||
'''
|
||
This method is used to calculate coefficient of determination to assess the goodness
|
||
of fit from the linear regression model
|
||
'''
|
||
y_pred = self.predict(x) # predicts Y value with X predict method.
|
||
y = pd.Series(y).astype('float64')
|
||
ss_res = ((y - y_pred) ** 2).sum()
|
||
# sum of squared residuals, residuals are difference between Y values and Y prediction values
|
||
ss_tot = ((y - y.mean()) ** 2).sum()
|
||
# total sum of squares, uses the difference between Y values and Y mean value
|
||
return 1.0 - ss_res / ss_tot
|
||
|
||
|
||
if __name__ == "__main__":
|
||
df = pd.read_csv('parkinsons_updrs.data', dtype=str)
|
||
|
||
df.drop(columns=['subject#'], inplace=True) # drops subject# column
|
||
|
||
missing_rows = df[df.isin(['?', 'NA', 'na', '']).any(axis=1)] # checks null values
|
||
print(f"Rows with null values: {len(missing_rows)}")
|
||
|
||
df.replace(['?','NA', 'na', ''], pd.NA, inplace=True) # replace null values with NA identifier
|
||
|
||
# check data types --> no problem
|
||
# print(df.dtypes)
|
||
|
||
# duplicates rows???
|
||
duplicates = df.duplicated().sum()
|
||
print(f"Num of duplicated rows:", duplicates)
|
||
# no duplicates but just in case:
|
||
df = df.drop_duplicates()
|
||
|
||
# check for highly correlated features --> ensure uniqueness of solution
|
||
# find them then note for 3rd phase
|
||
|
||
#Further experiments
|
||
# 0 indicates no correlation and 1 indicates perfect correlation
|
||
corr_matrix = df.corr().abs()
|
||
upper = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool))
|
||
|
||
# find features with correlation greater than 0.95
|
||
high_corr_features = []
|
||
for col in upper.columns:
|
||
high_corr = upper[col][upper[col] > 0.95]
|
||
if not high_corr.empty:
|
||
high_corr_features.append((col, high_corr.index.tolist()))
|
||
|
||
if high_corr_features:
|
||
print("\ncorrelated features (>0.95):")
|
||
for feature, correlated_with in high_corr_features:
|
||
print(f" {feature} AND {correlated_with}")
|
||
|
||
# check for weak correlation with target
|
||
target_corr = df.corr()['motor_UPDRS'].abs().sort_values(ascending=False)
|
||
print("\nCorrelation with target variable descending order:")
|
||
print(target_corr)
|
||
|
||
|
||
for col in df:
|
||
df[col] = pd.to_numeric(df[col], errors='coerce') # convert columns to numeric values
|
||
|
||
df.dropna(inplace=True) # remove null values
|
||
print(f"\nRows remaining after drop of the null values: {len(df)}\n")
|
||
|
||
# sanity checks for data validity - realistic parkinson data range estimations
|
||
df = df[(df['age'] >= 18) & (df['age'] <= 95)]
|
||
df = df[(df['motor_UPDRS'] >= 0) & (df['motor_UPDRS'] <= 100)]
|
||
df = df[(df['total_UPDRS'] >= 0) & (df['total_UPDRS'] <= 100)]
|
||
df = df[(df['Jitter(%)'] >= 0) & (df['Jitter(%)'] <= 10)]
|
||
df = df[(df['Shimmer(dB)'] >= 0) & (df['Shimmer(dB)'] <= 10)]
|
||
|
||
print(f"Rows after sanity checks: {len(df)}\n")
|
||
|
||
# check if there are still null values
|
||
assert df.isna().sum().sum() == 0, "There are still some null values."
|
||
|
||
# split the X and Y values
|
||
feature_columns = [col for col in df.columns if col not in ['motor_UPDRS', 'total_UPDRS', 'subject#']]
|
||
x = df[feature_columns]
|
||
y = df['motor_UPDRS']
|
||
|
||
|
||
# train / test splitting (80 / 20)
|
||
n_train = int(0.8 * len(x))
|
||
x_train, x_test = x.iloc[:n_train], x.iloc[n_train:]
|
||
y_train, y_test = y.iloc[:n_train], y.iloc[n_train:]
|
||
|
||
# training of the model
|
||
model = LinearRegression(lr=0.0001, n_iter=5000, batch_size=64, add_bias=True, verbose=True)
|
||
# other values could be used, for example (lr=0.01, n_iter=2000, batch_size=None, add_bias=True, verbose=False)
|
||
#model = LinearRegression(lr=0.0001, n_iter=5000, batch_size=64, add_bias=True, verbose=True, degree=2)
|
||
# using polynomial degree for non-linear base calculation.
|
||
model.fit(x_train, y_train)
|
||
|
||
# evaluation of the model
|
||
print("\nR² on training data:", model.score(x_train, y_train))
|
||
print("R² on testing data:", model.score(x_test, y_test))
|
||
|
||
# predict Y values using the trained data
|
||
preds = model.predict(x_test)
|
||
print("\nFirst 10 predictions:")
|
||
print(preds.head(10))
|
||
|
||
# weight report
|
||
print("\nWeights from the model:")
|
||
print(model.w)
|