Added the Linear Regression implementations.
This commit is contained in:
parent
a1643975ba
commit
7734802cd1
11 changed files with 434 additions and 1 deletions
3
.idea/.gitignore
generated
vendored
Normal file
3
.idea/.gitignore
generated
vendored
Normal file
|
|
@ -0,0 +1,3 @@
|
|||
# Default ignored files
|
||||
/shelf/
|
||||
/workspace.xml
|
||||
8
.idea/Parkinsons-data.iml
generated
Normal file
8
.idea/Parkinsons-data.iml
generated
Normal file
|
|
@ -0,0 +1,8 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<module type="PYTHON_MODULE" version="4">
|
||||
<component name="NewModuleRootManager">
|
||||
<content url="file://$MODULE_DIR$" />
|
||||
<orderEntry type="jdk" jdkName="Python 3.11" jdkType="Python SDK" />
|
||||
<orderEntry type="sourceFolder" forTests="false" />
|
||||
</component>
|
||||
</module>
|
||||
6
.idea/inspectionProfiles/profiles_settings.xml
generated
Normal file
6
.idea/inspectionProfiles/profiles_settings.xml
generated
Normal file
|
|
@ -0,0 +1,6 @@
|
|||
<component name="InspectionProjectProfileManager">
|
||||
<settings>
|
||||
<option name="USE_PROJECT_PROFILE" value="false" />
|
||||
<version value="1.0" />
|
||||
</settings>
|
||||
</component>
|
||||
7
.idea/misc.xml
generated
Normal file
7
.idea/misc.xml
generated
Normal file
|
|
@ -0,0 +1,7 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="Black">
|
||||
<option name="sdkName" value="Python 3.11" />
|
||||
</component>
|
||||
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.11" project-jdk-type="Python SDK" />
|
||||
</project>
|
||||
8
.idea/modules.xml
generated
Normal file
8
.idea/modules.xml
generated
Normal file
|
|
@ -0,0 +1,8 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="ProjectModuleManager">
|
||||
<modules>
|
||||
<module fileurl="file://$PROJECT_DIR$/.idea/Parkinsons-data.iml" filepath="$PROJECT_DIR$/.idea/Parkinsons-data.iml" />
|
||||
</modules>
|
||||
</component>
|
||||
</project>
|
||||
6
.idea/vcs.xml
generated
Normal file
6
.idea/vcs.xml
generated
Normal file
|
|
@ -0,0 +1,6 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="VcsDirectoryMappings">
|
||||
<mapping directory="" vcs="Git" />
|
||||
</component>
|
||||
</project>
|
||||
|
|
@ -1,3 +1,4 @@
|
|||
import numpy as np
|
||||
import pandas as pd
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
|
|
@ -24,3 +25,135 @@ class LinearRegression:
|
|||
|
||||
|
||||
|
||||
class LinearRegression:
|
||||
'''
|
||||
Constructor for the Linear Regression with analytical. It uses bias. It also
|
||||
initializes the weight, mean and std.
|
||||
'''
|
||||
def __init__(self, add_bias):
|
||||
self.add_bias = add_bias # bias to prepend a column of ones (the intercept term)
|
||||
self.w = None # weight/coefficient
|
||||
self.mean = None # used for standardisation
|
||||
self.std = None # standard deviation
|
||||
|
||||
|
||||
def prepare(self, x: pd.DataFrame) -> pd.DataFrame:
|
||||
'''
|
||||
Preparation method to ensure X is a float DataFrame, add a bias if it is true and standardise the X.
|
||||
'''
|
||||
x = x.copy()
|
||||
x = x.astype('float64')
|
||||
|
||||
if self.mean is None: # standardisation
|
||||
self.mean = x.mean()
|
||||
self.std = x.std(ddof=0)
|
||||
self.std.replace(0, 1, inplace=True) # guard against division by zero
|
||||
|
||||
x = (x - self.mean) / self.std # standardisation formula
|
||||
|
||||
if self.add_bias: # adding bias
|
||||
x['bias'] = 1.0
|
||||
|
||||
return x
|
||||
|
||||
|
||||
def fit(self, x: pd.DataFrame, y: pd.Series) -> "LinearRegression":
|
||||
'''
|
||||
Fit method to fit X and Y datas through pandas and train the linear model by analytical solution.
|
||||
It uses pandas DataFrame for the X and Series for the Y.
|
||||
'''
|
||||
x = self.prepare(x)
|
||||
y = pd.Series(y).astype("float64")
|
||||
|
||||
# convert to numpy for speed
|
||||
x_np = x.to_numpy() # n_samples, n_features
|
||||
y_np = y.to_numpy()[:, None] # n_samples, 1
|
||||
|
||||
# w = (X^T*X)^-1*X^T*Y
|
||||
xt_x = x_np.T.dot(x_np)
|
||||
xt_y = x_np.T.dot(y_np)
|
||||
w_np = np.linalg.pinv(xt_x).dot(xt_y) # n_features, 1
|
||||
|
||||
# store weights back as a pandas series
|
||||
self.w = pd.Series(
|
||||
w_np.ravel(), # flattens the array into 1-D array
|
||||
index=x.columns
|
||||
)
|
||||
return self
|
||||
|
||||
|
||||
def predict(self, x: pd.DataFrame) -> pd.Series:
|
||||
'''
|
||||
Predict method is used to test trained data to do X prediction by multiplying X and weight vectors.
|
||||
'''
|
||||
if self.w is None: # if weight is empty, throw error
|
||||
raise RuntimeError("Model is not fitted yet. Call `fit` first.")
|
||||
|
||||
x = self.prepare(x) # standardisation and adding bias through prepare method
|
||||
return x.dot(self.w)
|
||||
|
||||
def score(self, x: pd.DataFrame, y: pd.Series) -> float:
|
||||
'''
|
||||
This method is used to calculate coefficient of determination to assess the goodness
|
||||
of fit from a regression model
|
||||
'''
|
||||
y_pred = self.predict(x) # predicts Y value with X predict method.
|
||||
y = pd.Series(y).astype('float64')
|
||||
ss_res = ((y - y_pred) ** 2).sum()
|
||||
# sum of squared residuals, residuals are difference between Y values and Y prediction values
|
||||
ss_tot = ((y - y.mean()) ** 2).sum()
|
||||
# total sum of squares, uses the difference between Y values and Y mean value
|
||||
return 1.0 - ss_res / ss_tot
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
df = pd.read_csv('parkinsons_updrs.data', dtype=str)
|
||||
|
||||
df.drop(columns=['subject#'], inplace=True) # drops subject# column
|
||||
|
||||
missing_rows = df[df.isin(['?', 'NA', 'na', '']).any(axis=1)] # checks null values
|
||||
print(f"Rows with null values: {len(missing_rows)}")
|
||||
|
||||
df.replace(['?','NA', 'na', ''], pd.NA, inplace=True) # replace null values with NA identifier
|
||||
|
||||
num_cols = [
|
||||
'age', 'sex', 'test_time', 'motor_UPDRS', 'total_UPDRS',
|
||||
'Jitter(%)', 'Jitter(Abs)', 'Jitter:RAP', 'Jitter:PPQ5', 'Jitter:DDP',
|
||||
'Shimmer', 'Shimmer(dB)', 'Shimmer:APQ3', 'Shimmer:APQ5',
|
||||
'Shimmer:APQ11', 'Shimmer:DDA', 'NHR', 'HNR', 'RPDE', 'DFA', 'PPE'
|
||||
]
|
||||
|
||||
for col in num_cols:
|
||||
df[col] = pd.to_numeric(df[col], errors='coerce') # convert columns to numeric values
|
||||
|
||||
df.dropna(inplace=True) # remove null values
|
||||
print(f"Rows remaining after drop of the null values: {len(df)}")
|
||||
|
||||
# check if there are still null values
|
||||
assert df.isna().sum().sum() == 0, "There are still some null values."
|
||||
|
||||
# split the X and Y values
|
||||
target = 'total_UPDRS'
|
||||
x = df.drop(columns=[target])
|
||||
y = df[target]
|
||||
|
||||
# train / test splitting (80 / 20)
|
||||
n_train = int(0.8 * len(x))
|
||||
x_train, x_test = x.iloc[:n_train], x.iloc[n_train:]
|
||||
y_train, y_test = y.iloc[:n_train], y.iloc[n_train:]
|
||||
|
||||
# training of the model
|
||||
model = LinearRegression(add_bias=True)
|
||||
model.fit(x_train, y_train)
|
||||
|
||||
# evaluation of the model
|
||||
print("\nR² on training data:", model.score(x_train, y_train))
|
||||
print("\nR² on testing data:", model.score(x_test, y_test))
|
||||
|
||||
# predict Y values using the trained data
|
||||
preds = model.predict(x_test)
|
||||
print("\nFirst 5 predictions:")
|
||||
print(preds.head())
|
||||
|
||||
print("\nWeights:")
|
||||
print(model.w.round(4))
|
||||
|
|
|
|||
|
|
@ -1 +1,49 @@
|
|||
import numpy as np
|
||||
import pandas as pd
|
||||
|
||||
'''
|
||||
class LogisticRegression:
|
||||
def __init__(self):
|
||||
|
||||
def prepare(self):
|
||||
|
||||
def fit(self):
|
||||
|
||||
def predict(self):
|
||||
|
||||
def score(self):
|
||||
'''
|
||||
|
||||
if __name__ == "__main__":
|
||||
columns = [
|
||||
'ID', 'Diagnosis',
|
||||
'radius_mean', 'texture_mean', 'perimeter_mean', 'area_mean', 'smoothness_mean',
|
||||
'compactness_mean', 'concavity_mean', 'concave_points_mean', 'symmetry_mean', 'fractal_dimension_mean',
|
||||
'radius_se', 'texture_se', 'perimeter_se', 'area_se', 'smoothness_se',
|
||||
'compactness_se', 'concavity_se', 'concave_points_se', 'symmetry_se', 'fractal_dimension_se',
|
||||
'radius_worst', 'texture_worst', 'perimeter_worst', 'area_worst', 'smoothness_worst',
|
||||
'compactness_worst', 'concavity_worst', 'concave_points_worst', 'symmetry_worst', 'fractal_dimension_worst'
|
||||
]
|
||||
|
||||
df = pd.read_csv('wdbc.data', header=None, names=columns, dtype=str)
|
||||
|
||||
df.drop(columns=['ID'], inplace=True) # drops id column
|
||||
|
||||
missing_rows = df[df.isin(['?', 'NA', 'na', '']).any(axis=1)] # checks null values
|
||||
print(f"Rows with null values: {len(missing_rows)}")
|
||||
|
||||
df.replace(['?','NA', 'na', ''], pd.NA, inplace=True) # replace null values with NA identifier
|
||||
|
||||
num_cols = df.columns.difference(['Diagnosis'])
|
||||
for col in num_cols:
|
||||
df[col] = pd.to_numeric(df[col], errors='coerce') # convert columns to numeric values
|
||||
|
||||
df.dropna(inplace=True) # remove null values
|
||||
print(f"Rows remaining after drop of the null values: {len(df)}")
|
||||
for col in num_cols:
|
||||
df = df[df[col] >= 0]
|
||||
|
||||
# check if there are still null values
|
||||
assert df.isna().sum().sum() == 0, "There are still some null values."
|
||||
|
||||
df['Diagnosis'] = df['Diagnosis'].astype('category')
|
||||
166
mini-batch-sgd-linear-regression-parkinsons.py
Normal file
166
mini-batch-sgd-linear-regression-parkinsons.py
Normal file
|
|
@ -0,0 +1,166 @@
|
|||
import numpy as np
|
||||
import pandas as pd
|
||||
|
||||
class LinearRegression:
|
||||
'''
|
||||
Constructor for the Linear Regression with mini‑batch stochastic gradient descent. It uses learning rate,
|
||||
iteration number, batch size, bias and verbose. It also initializes the weight, mean and std.
|
||||
'''
|
||||
def __init__(self, lr, n_iter, batch_size, add_bias, verbose):
|
||||
self.lr = lr # learning rate
|
||||
self.n_iter = n_iter # number of gradient-descent iterations
|
||||
self.batch_size = batch_size # row number for each gradient step
|
||||
self.add_bias = add_bias # bias to prepend a column of ones (the intercept term)
|
||||
self.verbose = verbose # if true, prints the mean‑squared error every 100 iterations
|
||||
self.w = None # weight/coefficient
|
||||
self.mean = None # used for standardisation
|
||||
self.std = None # standard deviation
|
||||
|
||||
def prepare(self, x: pd.DataFrame) -> pd.DataFrame:
|
||||
'''
|
||||
Preparation method to ensure X is a float DataFrame, add a bias if it is true and standardise the X.
|
||||
'''
|
||||
x = x.copy()
|
||||
x = x.astype('float64')
|
||||
|
||||
if self.mean is None: # standardisation
|
||||
self.mean = x.mean()
|
||||
self.std = x.std(ddof=0)
|
||||
self.std.replace(0, 1, inplace=True) # guard against division by zero
|
||||
|
||||
x = (x - self.mean) / self.std # standardisation formula
|
||||
|
||||
if self.add_bias: # adding bias
|
||||
x['bias'] = 1.0
|
||||
|
||||
return x
|
||||
|
||||
|
||||
def fit(self, x: pd.DataFrame, y: pd.Series) -> "LinearRegression":
|
||||
'''
|
||||
Fit method to fit X and Y datas through pandas and train the linear model by gradient descent.
|
||||
It uses pandas DataFrame for the X and Series for the Y. For the n iterations, it returns batch X and Y values
|
||||
from random subset of indices calculates gradient from differences between predicted Y and batch Y values and
|
||||
calculates the weight. If verbose it prints the mean square error for each 100 iterations.
|
||||
'''
|
||||
x = self.prepare(x) # standardisation and adding bias through prepare method
|
||||
y = pd.Series(y).astype('float64') # check if Y is series.
|
||||
|
||||
x_np = x.to_numpy()
|
||||
y_np = y.to_numpy()
|
||||
|
||||
n_samples, n_features = x_np.shape # n samples
|
||||
w_np = np.zeros(n_features) # initialize weight as zero
|
||||
batch_size = self.batch_size
|
||||
# defines n samples as batch size if size is none or bigger than n samples
|
||||
if batch_size is None or batch_size >= n_samples:
|
||||
batch_size = n_samples
|
||||
|
||||
# number of batches per iteration
|
||||
n_batches = int(np.ceil(n_samples / batch_size))
|
||||
|
||||
for epoch in range(1, self.n_iter + 1):
|
||||
shuffled_idx = np.random.permutation(n_samples) # random permutation of the indices
|
||||
for b in range(n_batches):
|
||||
start = b * batch_size
|
||||
end = start + batch_size
|
||||
idx = shuffled_idx[start:end]
|
||||
|
||||
x_batch = x_np[idx]
|
||||
y_batch = y_np[idx]
|
||||
# it returns X and Y batch values from a randomly permuted indices from start to end
|
||||
|
||||
y_pred = x_batch.dot(w_np)
|
||||
# makes Y prediction value for X batch value by multiplying X and weight vectors.
|
||||
|
||||
error = y_batch - y_pred # error is difference between Y batch value and Y prediction value
|
||||
grad = -2 * x_batch.T.dot(error) / batch_size
|
||||
# gradient is calculated by multiplication of error, transposed X batch value and -2 divided by batch size
|
||||
|
||||
w_np -= self.lr * grad # weight is decreased by multiplication of learning rate and gradient
|
||||
|
||||
# if verbose, it calculates the mean squared error every 100 iterations and displays it
|
||||
if self.verbose and epoch % 100 == 0:
|
||||
y_full_pred = x.dot(w_np)
|
||||
mse = ((y_np - y_full_pred) ** 2).mean()
|
||||
print(f"Iter {epoch:5d} | MSE: {mse:.6f}")
|
||||
|
||||
self.w = pd.Series(w_np, index=x.columns) # store weights back as a pandas series
|
||||
return self
|
||||
|
||||
def predict(self, x: pd.DataFrame) -> pd.Series:
|
||||
'''
|
||||
Predict method makes X prediction by multiplying X and weight vectors.
|
||||
'''
|
||||
if self.w is None: # if weight is empty, throw error
|
||||
raise RuntimeError("Model is not fitted yet. Call `fit` first.")
|
||||
|
||||
x = self.prepare(x) # standardisation and adding bias through prepare method
|
||||
return x.dot(self.w)
|
||||
|
||||
def score(self, x: pd.DataFrame, y: pd.Series) -> float:
|
||||
'''
|
||||
This method is used to calculate coefficient of determination to assess the goodness
|
||||
of fit from a regression model
|
||||
'''
|
||||
y_pred = self.predict(x) # predicts Y value with X predict method.
|
||||
y = pd.Series(y).astype('float64')
|
||||
ss_res = ((y - y_pred) ** 2).sum()
|
||||
# sum of squared residuals, residuals are difference between Y values and Y prediction values
|
||||
ss_tot = ((y - y.mean()) ** 2).sum()
|
||||
# total sum of squares, uses the difference between Y values and Y mean value
|
||||
return 1.0 - ss_res / ss_tot
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
df = pd.read_csv('parkinsons_updrs.data', dtype=str)
|
||||
|
||||
df.drop(columns=['subject#'], inplace=True) # drops subject# column
|
||||
|
||||
missing_rows = df[df.isin(['?', 'NA', 'na', '']).any(axis=1)] # checks null values
|
||||
print(f"Rows with null values: {len(missing_rows)}")
|
||||
|
||||
df.replace(['?','NA', 'na', ''], pd.NA, inplace=True) # replace null values with NA identifier
|
||||
|
||||
num_cols = [
|
||||
'age', 'sex', 'test_time', 'motor_UPDRS', 'total_UPDRS',
|
||||
'Jitter(%)', 'Jitter(Abs)', 'Jitter:RAP', 'Jitter:PPQ5', 'Jitter:DDP',
|
||||
'Shimmer', 'Shimmer(dB)', 'Shimmer:APQ3', 'Shimmer:APQ5',
|
||||
'Shimmer:APQ11', 'Shimmer:DDA', 'NHR', 'HNR', 'RPDE', 'DFA', 'PPE'
|
||||
]
|
||||
|
||||
for col in num_cols:
|
||||
df[col] = pd.to_numeric(df[col], errors='coerce') # convert columns to numeric values
|
||||
|
||||
df.dropna(inplace=True) # remove null values
|
||||
print(f"Rows remaining after drop of the null values: {len(df)}")
|
||||
|
||||
# check if there are still null values
|
||||
assert df.isna().sum().sum() == 0, "There are still some null values."
|
||||
|
||||
# split the X and Y values
|
||||
target = 'total_UPDRS'
|
||||
x = df.drop(columns=[target])
|
||||
y = df[target]
|
||||
|
||||
# train / test splitting (80 / 20)
|
||||
n_train = int(0.8 * len(x))
|
||||
x_train, x_test = x.iloc[:n_train], x.iloc[n_train:]
|
||||
y_train, y_test = y.iloc[:n_train], y.iloc[n_train:]
|
||||
|
||||
# training of the model
|
||||
model = LinearRegression(lr=0.0001, n_iter=5000, batch_size=64, add_bias=True, verbose=True)
|
||||
# other values could be used, for example (lr=0.01, n_iter=2000, batch_size=None, add_bias=True, verbose=False)
|
||||
model.fit(x_train, y_train)
|
||||
|
||||
# evaluation of the model
|
||||
print("\nR² on training data:", model.score(x_train, y_train))
|
||||
print("\nR² on testing data:", model.score(x_test, y_test))
|
||||
|
||||
# predict Y values using the trained data
|
||||
preds = model.predict(x_test)
|
||||
print("\nFirst 5 predictions:")
|
||||
print(preds.head())
|
||||
|
||||
print("\nWeights:")
|
||||
print(model.w.round(4))
|
||||
49
mini-batch-sgd-logistic-regression-wdbc.py
Normal file
49
mini-batch-sgd-logistic-regression-wdbc.py
Normal file
|
|
@ -0,0 +1,49 @@
|
|||
import numpy as np
|
||||
import pandas as pd
|
||||
|
||||
'''
|
||||
class LogisticRegression:
|
||||
def __init__(self):
|
||||
|
||||
def prepare(self):
|
||||
|
||||
def fit(self):
|
||||
|
||||
def predict(self):
|
||||
|
||||
def score(self):
|
||||
'''
|
||||
|
||||
if __name__ == "__main__":
|
||||
columns = [
|
||||
'ID', 'Diagnosis',
|
||||
'radius_mean', 'texture_mean', 'perimeter_mean', 'area_mean', 'smoothness_mean',
|
||||
'compactness_mean', 'concavity_mean', 'concave_points_mean', 'symmetry_mean', 'fractal_dimension_mean',
|
||||
'radius_se', 'texture_se', 'perimeter_se', 'area_se', 'smoothness_se',
|
||||
'compactness_se', 'concavity_se', 'concave_points_se', 'symmetry_se', 'fractal_dimension_se',
|
||||
'radius_worst', 'texture_worst', 'perimeter_worst', 'area_worst', 'smoothness_worst',
|
||||
'compactness_worst', 'concavity_worst', 'concave_points_worst', 'symmetry_worst', 'fractal_dimension_worst'
|
||||
]
|
||||
|
||||
df = pd.read_csv('wdbc.data', header=None, names=columns, dtype=str)
|
||||
|
||||
df.drop(columns=['ID'], inplace=True) # drops id column
|
||||
|
||||
missing_rows = df[df.isin(['?', 'NA', 'na', '']).any(axis=1)] # checks null values
|
||||
print(f"Rows with null values: {len(missing_rows)}")
|
||||
|
||||
df.replace(['?','NA', 'na', ''], pd.NA, inplace=True) # replace null values with NA identifier
|
||||
|
||||
num_cols = df.columns.difference(['Diagnosis'])
|
||||
for col in num_cols:
|
||||
df[col] = pd.to_numeric(df[col], errors='coerce') # convert columns to numeric values
|
||||
|
||||
df.dropna(inplace=True) # remove null values
|
||||
print(f"Rows remaining after drop of the null values: {len(df)}")
|
||||
for col in num_cols:
|
||||
df = df[df[col] >= 0]
|
||||
|
||||
# check if there are still null values
|
||||
assert df.isna().sum().sum() == 0, "There are still some null values."
|
||||
|
||||
df['Diagnosis'] = df['Diagnosis'].astype('category')
|
||||
|
|
@ -1 +0,0 @@
|
|||
|
||||
Loading…
Add table
Add a link
Reference in a new issue