From 6c0803f6a31ef86212bebaa103f9b33575dde98f Mon Sep 17 00:00:00 2001 From: rahulkhorana Date: Sun, 24 Mar 2024 16:42:20 -0700 Subject: [PATCH] formatting fix --- experiments/esol_experiment.py | 138 +++++++---- experiments/freesolv_experiment.py | 135 +++++++---- experiments/gaussian_process.py | 165 ++++++++----- experiments/kernels.py | 33 ++- experiments/lipophilicity_experiment.py | 135 +++++++---- experiments/load_process_data.py | 296 +++++++++++++++++------- experiments/photoswitches_experiment.py | 154 ++++++++---- src/atomic_complex.py | 143 +++++++----- src/build_atom_lookup.py | 58 ++--- src/building_blocks.py | 61 +++-- src/core_utils.py | 48 ++-- src/general_utils.py | 106 ++++++--- src/polyatomic_complex.py | 156 +++++++++---- src/process_esol.py | 61 +++-- src/process_freesolv.py | 61 +++-- src/process_lipophilicity.py | 61 +++-- src/process_materials_project.py | 42 ++-- src/process_photoswitches.py | 61 +++-- 18 files changed, 1263 insertions(+), 651 deletions(-) diff --git a/experiments/esol_experiment.py b/experiments/esol_experiment.py index aaf011f..04f21d5 100644 --- a/experiments/esol_experiment.py +++ b/experiments/esol_experiment.py @@ -5,10 +5,10 @@ import numpy as np from load_process_data import LoadDatasetForTask -#botorch specific +# botorch specific from botorch.models.gp_regression import ExactGP -#gpytorch specific +# gpytorch specific from gpytorch.means import ConstantMean from gpytorch.kernels import ScaleKernel, RBFKernel from gpytorch.distributions import MultivariateNormal @@ -25,8 +25,8 @@ if torch.cuda.is_available(): dev = "cuda:0" torch.cuda.empty_cache() -else: - dev = "cpu" +else: + dev = "cpu" device = torch.device(dev) @@ -41,6 +41,7 @@ def forward(self, x): covar_x = self.covar_module(x) return MultivariateNormal(mean_x, covar_x) + class GraphGP(SIGP): def __init__(self, train_x, train_y, likelihood, kernel, **kernel_kwargs): super().__init__(train_x, train_y, likelihood) @@ -61,67 +62,122 @@ def forward(self, x): return MultivariateNormal(mean, covariance) -def initialize_model(train_x:torch.Tensor, train_obj:torch.Tensor, likelihood): +def initialize_model(train_x: torch.Tensor, train_obj: torch.Tensor, likelihood): model = ExactGPModel(train_x, train_obj, likelihood).to(train_x) return model + def initialize_graph_gp(train_x, train_obj, likelihood, kernel, **kernel_kwargs): - model = GraphGP(train_x, train_obj, likelihood, kernel, **kernel_kwargs) + model = GraphGP(train_x, train_obj, likelihood, kernel, **kernel_kwargs) return model + def one_experiment(target, encoding, n_trials, n_iters): - X,y = [], [] - if encoding == 'complexes': - X,y = LoadDatasetForTask(X='dataset/esol/fast_complex_lookup_repn.pkl', y='dataset/esol/ESOL.csv', repn=encoding, y_column=target).load_esol() - elif encoding == 'deep_complexes': - X,y = LoadDatasetForTask(X='dataset/esol/deep_complex_lookup_repn.pkl',y='dataset/esol/ESOL.csv',repn=encoding, y_column=target).load_esol() - elif ENCODING == 'fingerprints': - X,y = LoadDatasetForTask(X='gauche_ecfp', y='dataset/esol/ESOL.csv', repn=encoding, y_column=target).load_esol() - elif ENCODING == 'SELFIES': - X,y = LoadDatasetForTask(X='gauche_selfies', y='dataset/esol/ESOL.csv', repn=encoding, y_column=target).load_esol() - elif ENCODING == 'GRAPHS': - X,y = LoadDatasetForTask(X='gauche_graphs', y='dataset/esol/ESOL.csv', repn=encoding, y_column=target).load_esol() - - if ENCODING != 'GRAPHS': - r2_list,rmse_list, mae_list, confidence_percentiles, mae_mean, mae_std = evaluate_model(initialize_model=initialize_model, n_trials=n_trials, n_iters=n_iters, test_set_size=holdout_set_size, X=X, y=y, figure_path=f'results/{EXPERIMENT_TYPE}/confidence_mae_model_{ENCODING}_{target}.png') + X, y = [], [] + if encoding == "complexes": + X, y = LoadDatasetForTask( + X="dataset/esol/fast_complex_lookup_repn.pkl", + y="dataset/esol/ESOL.csv", + repn=encoding, + y_column=target, + ).load_esol() + elif encoding == "deep_complexes": + X, y = LoadDatasetForTask( + X="dataset/esol/deep_complex_lookup_repn.pkl", + y="dataset/esol/ESOL.csv", + repn=encoding, + y_column=target, + ).load_esol() + elif ENCODING == "fingerprints": + X, y = LoadDatasetForTask( + X="gauche_ecfp", y="dataset/esol/ESOL.csv", repn=encoding, y_column=target + ).load_esol() + elif ENCODING == "SELFIES": + X, y = LoadDatasetForTask( + X="gauche_selfies", + y="dataset/esol/ESOL.csv", + repn=encoding, + y_column=target, + ).load_esol() + elif ENCODING == "GRAPHS": + X, y = LoadDatasetForTask( + X="gauche_graphs", y="dataset/esol/ESOL.csv", repn=encoding, y_column=target + ).load_esol() + + if ENCODING != "GRAPHS": + r2_list, rmse_list, mae_list, confidence_percentiles, mae_mean, mae_std = ( + evaluate_model( + initialize_model=initialize_model, + n_trials=n_trials, + n_iters=n_iters, + test_set_size=holdout_set_size, + X=X, + y=y, + figure_path=f"results/{EXPERIMENT_TYPE}/confidence_mae_model_{ENCODING}_{target}.png", + ) + ) else: - r2_list,rmse_list, mae_list, confidence_percentiles, mae_mean, mae_std = evaluate_graph_model(initialize_graph_gp, n_trials=n_trials, n_iters=n_iters, test_set_size=holdout_set_size, X=X, y=y, figure_path=f'results/{EXPERIMENT_TYPE}/confidence_mae_model_{ENCODING}_{target}.png') - - mean_r2 = "\nmean R^2: {:.4f} +- {:.4f}".format(np.mean(r2_list), np.std(r2_list)/np.sqrt(len(r2_list))) - mean_rmse = "mean RMSE: {:.4f} +- {:.4f}".format(np.mean(rmse_list), np.std(rmse_list)/np.sqrt(len(rmse_list))) - mean_mae = "mean MAE: {:.4f} +- {:.4f}\n".format(np.mean(mae_list), np.std(mae_list)/np.sqrt(len(mae_list))) + r2_list, rmse_list, mae_list, confidence_percentiles, mae_mean, mae_std = ( + evaluate_graph_model( + initialize_graph_gp, + n_trials=n_trials, + n_iters=n_iters, + test_set_size=holdout_set_size, + X=X, + y=y, + figure_path=f"results/{EXPERIMENT_TYPE}/confidence_mae_model_{ENCODING}_{target}.png", + ) + ) + + mean_r2 = "\nmean R^2: {:.4f} +- {:.4f}".format( + np.mean(r2_list), np.std(r2_list) / np.sqrt(len(r2_list)) + ) + mean_rmse = "mean RMSE: {:.4f} +- {:.4f}".format( + np.mean(rmse_list), np.std(rmse_list) / np.sqrt(len(rmse_list)) + ) + mean_mae = "mean MAE: {:.4f} +- {:.4f}\n".format( + np.mean(mae_list), np.std(mae_list) / np.sqrt(len(mae_list)) + ) return mean_r2, mean_rmse, mean_mae -if __name__ == '__main__': - EXPERIMENT_TYPE = 'ESOL' - ENCODING = 'GRAPHS' +if __name__ == "__main__": + EXPERIMENT_TYPE = "ESOL" + ENCODING = "GRAPHS" N_TRIALS = 20 N_ITERS = 5 holdout_set_size = 0.33 # dataset processing - X,y = [], [] + X, y = [], [] # dataset loading - possible_target_cols = ['ESOL predicted log solubility in mols per litre','Minimum Degree','Molecular Weight','Number of H-Bond Donors','Number of Rings','Number of Rotatable Bonds','Polar Surface Area','measured log solubility in mols per litre'] + possible_target_cols = [ + "ESOL predicted log solubility in mols per litre", + "Minimum Degree", + "Molecular Weight", + "Number of H-Bond Donors", + "Number of Rings", + "Number of Rotatable Bonds", + "Polar Surface Area", + "measured log solubility in mols per litre", + ] results = [] - + for col in possible_target_cols: - mean_r2, mean_rmse, mean_mae = one_experiment(col, ENCODING,N_TRIALS,N_ITERS) + mean_r2, mean_rmse, mean_mae = one_experiment(col, ENCODING, N_TRIALS, N_ITERS) results.append([col, mean_r2, mean_rmse, mean_mae]) - if type(EXPERIMENT_TYPE) is str: - trial_num = len(os.listdir(f'results/{EXPERIMENT_TYPE}')) + trial_num = len(os.listdir(f"results/{EXPERIMENT_TYPE}")) results_path = f"results/{EXPERIMENT_TYPE}/{ENCODING}_{time.time()}.txt" - with open(results_path, 'w') as f: - f.write(EXPERIMENT_TYPE + ':') - f.write('\n') - f.write(ENCODING + ':') + with open(results_path, "w") as f: + f.write(EXPERIMENT_TYPE + ":") + f.write("\n") + f.write(ENCODING + ":") for result in results: col, mean_r2, mean_rmse, mean_mae = result - f.write(f'column: {col}, {mean_r2}, {mean_rmse}, {mean_mae}') - f.write('\n') + f.write(f"column: {col}, {mean_r2}, {mean_rmse}, {mean_mae}") + f.write("\n") f.close() - print("CONCLUDED") \ No newline at end of file + print("CONCLUDED") diff --git a/experiments/freesolv_experiment.py b/experiments/freesolv_experiment.py index b725d80..d3f37e9 100644 --- a/experiments/freesolv_experiment.py +++ b/experiments/freesolv_experiment.py @@ -5,10 +5,10 @@ import numpy as np from load_process_data import LoadDatasetForTask -#botorch specific +# botorch specific from botorch.models.gp_regression import ExactGP -#gpytorch specific +# gpytorch specific from gpytorch.means import ConstantMean from gpytorch.kernels import ScaleKernel, RBFKernel from gpytorch.distributions import MultivariateNormal @@ -25,8 +25,8 @@ if torch.cuda.is_available(): dev = "cuda:0" torch.cuda.empty_cache() -else: - dev = "cpu" +else: + dev = "cpu" device = torch.device(dev) @@ -41,6 +41,7 @@ def forward(self, x): covar_x = self.covar_module(x) return MultivariateNormal(mean_x, covar_x) + class GraphGP(SIGP): def __init__(self, train_x, train_y, likelihood, kernel, **kernel_kwargs): super().__init__(train_x, train_y, likelihood) @@ -61,67 +62,119 @@ def forward(self, x): return MultivariateNormal(mean, covariance) -def initialize_model(train_x:torch.Tensor, train_obj:torch.Tensor, likelihood): +def initialize_model(train_x: torch.Tensor, train_obj: torch.Tensor, likelihood): model = ExactGPModel(train_x, train_obj, likelihood).to(train_x) return model + def initialize_graph_gp(train_x, train_obj, likelihood, kernel, **kernel_kwargs): - model = GraphGP(train_x, train_obj, likelihood, kernel, **kernel_kwargs) + model = GraphGP(train_x, train_obj, likelihood, kernel, **kernel_kwargs) return model + def one_experiment(target, encoding, n_trials, n_iters): - X,y = [], [] - if encoding == 'complexes': - X,y = LoadDatasetForTask(X='dataset/free_solv/fast_complex_lookup_repn.pkl', y='dataset/free_solv/FreeSolv.csv', repn=encoding, y_column=target).load_freesolv() - elif encoding == 'deep_complexes': - X,y = LoadDatasetForTask(X='dataset/free_solv/deep_complex_lookup_repn.pkl',y='dataset/free_solv/FreeSolv.csv',repn=encoding, y_column=target).load_freesolv() - elif ENCODING == 'fingerprints': - X,y = LoadDatasetForTask(X='gauche_ecfp', y='dataset/free_solv/FreeSolv.csv', repn=encoding, y_column=target).load_freesolv() - elif ENCODING == 'SELFIES': - X,y = LoadDatasetForTask(X='gauche_selfies', y='dataset/free_solv/FreeSolv.csv', repn=encoding, y_column=target).load_freesolv() - elif ENCODING == 'GRAPHS': - X,y = LoadDatasetForTask(X='gauche_graphs', y='dataset/free_solv/FreeSolv.csv', repn=encoding, y_column=target).load_freesolv() - - if ENCODING != 'GRAPHS': - r2_list,rmse_list, mae_list, confidence_percentiles, mae_mean, mae_std = evaluate_model(initialize_model=initialize_model, n_trials=n_trials, n_iters=n_iters, test_set_size=holdout_set_size, X=X, y=y, figure_path=f'results/{EXPERIMENT_TYPE}/confidence_mae_model_{ENCODING}_{target}.png') + X, y = [], [] + if encoding == "complexes": + X, y = LoadDatasetForTask( + X="dataset/free_solv/fast_complex_lookup_repn.pkl", + y="dataset/free_solv/FreeSolv.csv", + repn=encoding, + y_column=target, + ).load_freesolv() + elif encoding == "deep_complexes": + X, y = LoadDatasetForTask( + X="dataset/free_solv/deep_complex_lookup_repn.pkl", + y="dataset/free_solv/FreeSolv.csv", + repn=encoding, + y_column=target, + ).load_freesolv() + elif ENCODING == "fingerprints": + X, y = LoadDatasetForTask( + X="gauche_ecfp", + y="dataset/free_solv/FreeSolv.csv", + repn=encoding, + y_column=target, + ).load_freesolv() + elif ENCODING == "SELFIES": + X, y = LoadDatasetForTask( + X="gauche_selfies", + y="dataset/free_solv/FreeSolv.csv", + repn=encoding, + y_column=target, + ).load_freesolv() + elif ENCODING == "GRAPHS": + X, y = LoadDatasetForTask( + X="gauche_graphs", + y="dataset/free_solv/FreeSolv.csv", + repn=encoding, + y_column=target, + ).load_freesolv() + + if ENCODING != "GRAPHS": + r2_list, rmse_list, mae_list, confidence_percentiles, mae_mean, mae_std = ( + evaluate_model( + initialize_model=initialize_model, + n_trials=n_trials, + n_iters=n_iters, + test_set_size=holdout_set_size, + X=X, + y=y, + figure_path=f"results/{EXPERIMENT_TYPE}/confidence_mae_model_{ENCODING}_{target}.png", + ) + ) else: - r2_list,rmse_list, mae_list, confidence_percentiles, mae_mean, mae_std = evaluate_graph_model(initialize_graph_gp, n_trials=n_trials, n_iters=n_iters, test_set_size=holdout_set_size, X=X, y=y, figure_path=f'results/{EXPERIMENT_TYPE}/confidence_mae_model_{ENCODING}_{target}.png') - - mean_r2 = "\nmean R^2: {:.4f} +- {:.4f}".format(np.mean(r2_list), np.std(r2_list)/np.sqrt(len(r2_list))) - mean_rmse = "mean RMSE: {:.4f} +- {:.4f}".format(np.mean(rmse_list), np.std(rmse_list)/np.sqrt(len(rmse_list))) - mean_mae = "mean MAE: {:.4f} +- {:.4f}\n".format(np.mean(mae_list), np.std(mae_list)/np.sqrt(len(mae_list))) + r2_list, rmse_list, mae_list, confidence_percentiles, mae_mean, mae_std = ( + evaluate_graph_model( + initialize_graph_gp, + n_trials=n_trials, + n_iters=n_iters, + test_set_size=holdout_set_size, + X=X, + y=y, + figure_path=f"results/{EXPERIMENT_TYPE}/confidence_mae_model_{ENCODING}_{target}.png", + ) + ) + + mean_r2 = "\nmean R^2: {:.4f} +- {:.4f}".format( + np.mean(r2_list), np.std(r2_list) / np.sqrt(len(r2_list)) + ) + mean_rmse = "mean RMSE: {:.4f} +- {:.4f}".format( + np.mean(rmse_list), np.std(rmse_list) / np.sqrt(len(rmse_list)) + ) + mean_mae = "mean MAE: {:.4f} +- {:.4f}\n".format( + np.mean(mae_list), np.std(mae_list) / np.sqrt(len(mae_list)) + ) return mean_r2, mean_rmse, mean_mae -if __name__ == '__main__': - EXPERIMENT_TYPE = 'FreeSolv' - ENCODING = 'GRAPHS' +if __name__ == "__main__": + EXPERIMENT_TYPE = "FreeSolv" + ENCODING = "GRAPHS" N_TRIALS = 20 N_ITERS = 5 holdout_set_size = 0.33 # dataset processing - X,y = [], [] + X, y = [], [] # dataset loading - possible_target_cols = ['expt','calc'] + possible_target_cols = ["expt", "calc"] results = [] - + for col in possible_target_cols: - mean_r2, mean_rmse, mean_mae = one_experiment(col, ENCODING,N_TRIALS,N_ITERS) + mean_r2, mean_rmse, mean_mae = one_experiment(col, ENCODING, N_TRIALS, N_ITERS) results.append([col, mean_r2, mean_rmse, mean_mae]) - if type(EXPERIMENT_TYPE) is str: - trial_num = len(os.listdir(f'results/{EXPERIMENT_TYPE}')) + trial_num = len(os.listdir(f"results/{EXPERIMENT_TYPE}")) results_path = f"results/{EXPERIMENT_TYPE}/{ENCODING}_{time.time()}.txt" - with open(results_path, 'w') as f: - f.write(EXPERIMENT_TYPE + ':') - f.write('\n') - f.write(ENCODING + ':') + with open(results_path, "w") as f: + f.write(EXPERIMENT_TYPE + ":") + f.write("\n") + f.write(ENCODING + ":") for result in results: col, mean_r2, mean_rmse, mean_mae = result - f.write(f'column: {col}, {mean_r2}, {mean_rmse}, {mean_mae}') - f.write('\n') + f.write(f"column: {col}, {mean_r2}, {mean_rmse}, {mean_mae}") + f.write("\n") f.close() - print("CONCLUDED") \ No newline at end of file + print("CONCLUDED") diff --git a/experiments/gaussian_process.py b/experiments/gaussian_process.py index d4f2090..3885871 100644 --- a/experiments/gaussian_process.py +++ b/experiments/gaussian_process.py @@ -16,12 +16,12 @@ import numpy as np import matplotlib.pyplot as plt -#botorch specific +# botorch specific from botorch import fit_gpytorch_model from botorch.acquisition import ExpectedImprovement import warnings -#sklearn specific +# sklearn specific from sklearn.model_selection import train_test_split from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error @@ -33,12 +33,13 @@ from gauche import NonTensorialInputs from gauche.kernels.graph_kernels import WeisfeilerLehmanKernel -if torch.cuda.is_available(): - dev = "cuda:0" -else: - dev = "cpu" +if torch.cuda.is_available(): + dev = "cuda:0" +else: + dev = "cpu" device = torch.device(dev) + def optimize_acqf_and_get_observation(acq_func, heldout_inputs, heldout_outputs): # Loop over the discrete set of points to evaluate the acquisition function at. acq_vals = [] @@ -53,8 +54,12 @@ def optimize_acqf_and_get_observation(acq_func, heldout_inputs, heldout_outputs) new_obj = heldout_outputs[best_idx].unsqueeze(-1) # add output dimension # Delete the selected input and value from the heldout set. - heldout_inputs = torch.cat((heldout_inputs[:best_idx], heldout_inputs[best_idx+1:]), axis=0) - heldout_outputs = torch.cat((heldout_outputs[:best_idx], heldout_outputs[best_idx+1:]), axis=0) + heldout_inputs = torch.cat( + (heldout_inputs[:best_idx], heldout_inputs[best_idx + 1 :]), axis=0 + ) + heldout_outputs = torch.cat( + (heldout_outputs[:best_idx], heldout_outputs[best_idx + 1 :]), axis=0 + ) return new_x, new_obj, heldout_inputs, heldout_outputs @@ -66,37 +71,56 @@ def update_random_observations(best_random, heldout_inputs, heldout_outputs): best_random.append(max(best_random[-1], next_random_best)) # Delete the selected input and value from the heldout set. - heldout_inputs = torch.cat((heldout_inputs[:index], heldout_inputs[index+1:]), axis=0) - heldout_outputs = torch.cat((heldout_outputs[:index], heldout_outputs[index+1:]), axis=0) + heldout_inputs = torch.cat( + (heldout_inputs[:index], heldout_inputs[index + 1 :]), axis=0 + ) + heldout_outputs = torch.cat( + (heldout_outputs[:index], heldout_outputs[index + 1 :]), axis=0 + ) return best_random, heldout_inputs, heldout_outputs -def transform_batched_tensor(tensors:list, max_len=None) -> Tuple[torch.Tensor, int]: + +def transform_batched_tensor(tensors: list, max_len=None) -> Tuple[torch.Tensor, int]: arr = [] for t in tensors: i, j, k, l = t - assert type(i) is torch.Tensor and type(j) is torch.Tensor and type(k) is torch.Tensor and type(l) is torch.Tensor + assert ( + type(i) is torch.Tensor + and type(j) is torch.Tensor + and type(k) is torch.Tensor + and type(l) is torch.Tensor + ) t = torch.concat([i.flatten(), j.flatten(), k.flatten(), l.flatten()]) arr.append(t) if max_len is None: max_len = max([x.squeeze().numel() for x in arr]) - data = [torch.nn.functional.pad(x, pad=(0, max_len - x.numel()), mode='constant', value=0) for x in arr] + data = [ + torch.nn.functional.pad( + x, pad=(0, max_len - x.numel()), mode="constant", value=0 + ) + for x in arr + ] data = torch.stack(data) return data, max_len - -def run_training_loop(initialize_model, n_trials, n_iters, holdout_size, X, y, verbose=False): + +def run_training_loop( + initialize_model, n_trials, n_iters, holdout_size, X, y, verbose=False +): best_observed_all_ei, best_random_all = [], [] warnings.filterwarnings("ignore") - warnings.filterwarnings('ignore', category=RuntimeWarning) + warnings.filterwarnings("ignore", category=RuntimeWarning) for trial in range(1, n_trials + 1): print(f"\nTrial {trial:>2} of {n_trials} ", end="") best_observed_ei, best_random = [], [] # Generate initial training data and initialize model - train_x_ei, heldout_x_ei, train_y_ei, heldout_y_ei = train_test_split(X, y, test_size=holdout_size, random_state=trial) + train_x_ei, heldout_x_ei, train_y_ei, heldout_y_ei = train_test_split( + X, y, test_size=holdout_size, random_state=trial + ) best_observed_value_ei = torch.max(train_y_ei) # Convert numpy arrays to PyTorch tensors and flatten the label vectors @@ -123,20 +147,26 @@ def run_training_loop(initialize_model, n_trials, n_iters, holdout_size, X, y, v fit_gpytorch_model(mll_ei) # Use analytic acquisition function for batch size of 1. - EI = ExpectedImprovement(model=model_ei, best_f=(train_y_ei.to(train_y_ei)).max()) + EI = ExpectedImprovement( + model=model_ei, best_f=(train_y_ei.to(train_y_ei)).max() + ) - new_x_ei, new_obj_ei, heldout_x_ei, heldout_y_ei = optimize_acqf_and_get_observation(EI, - heldout_x_ei, - heldout_y_ei) + new_x_ei, new_obj_ei, heldout_x_ei, heldout_y_ei = ( + optimize_acqf_and_get_observation(EI, heldout_x_ei, heldout_y_ei) + ) # update training points train_x_ei = torch.cat([train_x_ei, new_x_ei]) train_y_ei = torch.cat([train_y_ei, new_obj_ei]) # update random search progress - best_random, heldout_x_random, heldout_y_random = update_random_observations(best_random, - heldout_inputs=heldout_x_random, - heldout_outputs=heldout_y_random) + best_random, heldout_x_random, heldout_y_random = ( + update_random_observations( + best_random, + heldout_inputs=heldout_x_random, + heldout_outputs=heldout_y_random, + ) + ) best_value_ei = torch.max(new_obj_ei, best_observed_ei[-1]) best_observed_ei.append(best_value_ei.squeeze()) @@ -154,7 +184,8 @@ def run_training_loop(initialize_model, n_trials, n_iters, holdout_size, X, y, v print( f"\nBatch {iteration:>2}: best_value (random, qEI) = " f"({max(best_random):>4.2f}, {best_value_ei:>4.2f}), " - f"time = {t1 - t0:>4.2f}.", end="" + f"time = {t1 - t0:>4.2f}.", + end="", ) else: print(".", end="") @@ -166,36 +197,50 @@ def run_training_loop(initialize_model, n_trials, n_iters, holdout_size, X, y, v best_random_all.append(best_random) return best_observed_all_ei, best_random_all -def evaluate_model(initialize_model, n_trials, n_iters, test_set_size, X, y, figure_path): + +def evaluate_model( + initialize_model, n_trials, n_iters, test_set_size, X, y, figure_path +): # initialise performance metric lists r2_list = [] rmse_list = [] mae_list = [] warnings.filterwarnings("ignore") - warnings.filterwarnings('ignore', category=RuntimeWarning) - + warnings.filterwarnings("ignore", category=RuntimeWarning) + # We pre-allocate array for plotting confidence-error curves - _, _, _, y_test = train_test_split(X, y, test_size=test_set_size) # To get test set size + _, _, _, y_test = train_test_split( + X, y, test_size=test_set_size + ) # To get test set size n_test = len(y_test) mae_confidence_list = np.zeros((n_trials, n_test)) - - print('\nBeginning training loop...') + + print("\nBeginning training loop...") for i in range(0, n_trials): - - print(f'Starting trial {i}') - - X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_set_size, random_state=i) + + print(f"Starting trial {i}") + + X_train, X_test, y_train, y_test = train_test_split( + X, y, test_size=test_set_size, random_state=i + ) # We standardise the outputs - _, y_train, _, y_test, y_scaler = transform_data(X_train, y_train, X_test, y_test) + _, y_train, _, y_test, y_scaler = transform_data( + X_train, y_train, X_test, y_test + ) # Convert numpy arrays to PyTorch tensors and flatten the label vectors - #print(f'types {type(X_train)}') + # print(f'types {type(X_train)}') y_train = torch.tensor(y_train.astype(np.float64)).flatten() y_test = torch.tensor(y_test.astype(np.float64)).flatten() - assert isinstance(X_train, torch.Tensor) and isinstance(y_train, torch.Tensor) and isinstance(X_test, torch.Tensor) and isinstance(y_test, torch.Tensor) + assert ( + isinstance(X_train, torch.Tensor) + and isinstance(y_train, torch.Tensor) + and isinstance(X_test, torch.Tensor) + and isinstance(y_test, torch.Tensor) + ) likelihood = GaussianLikelihood() # initialise GP likelihood and model @@ -223,7 +268,7 @@ def evaluate_model(initialize_model, n_trials, n_iters, test_set_size, X, y, fig # to make compatible with inverse_transform in scikit-learn version > 1 y_pred = y_scaler.inverse_transform(y_pred.detach().unsqueeze(dim=1)) y_test = y_scaler.inverse_transform(y_test.detach().unsqueeze(dim=1)) - + # Compute scores for confidence curve plotting. ranked_confidence_list = np.argsort(y_var.detach(), axis=0).flatten() @@ -232,7 +277,7 @@ def evaluate_model(initialize_model, n_trials, n_iters, test_set_size, X, y, fig # Construct the MAE error for each level of confidence - conf = ranked_confidence_list[0:k+1] + conf = ranked_confidence_list[0 : k + 1] mae = mean_absolute_error(y_test[conf], y_pred[conf]) mae_confidence_list[i, k] = mae @@ -240,8 +285,12 @@ def evaluate_model(initialize_model, n_trials, n_iters, test_set_size, X, y, fig y_train = y_train.detach() y_pred_train = model(X_train).mean.detach() train_rmse_stan = np.sqrt(mean_squared_error(y_train, y_pred_train)) - train_rmse = np.sqrt(mean_squared_error(y_scaler.inverse_transform(y_train.unsqueeze(dim=1)), - y_scaler.inverse_transform(y_pred_train.unsqueeze(dim=1)))) + train_rmse = np.sqrt( + mean_squared_error( + y_scaler.inverse_transform(y_train.unsqueeze(dim=1)), + y_scaler.inverse_transform(y_pred_train.unsqueeze(dim=1)), + ) + ) # Compute R^2, RMSE and MAE on Test set score = r2_score(y_test, y_pred) @@ -251,14 +300,14 @@ def evaluate_model(initialize_model, n_trials, n_iters, test_set_size, X, y, fig r2_list.append(score) rmse_list.append(rmse) mae_list.append(mae) - + r2_list = np.array(r2_list) rmse_list = np.array(rmse_list) mae_list = np.array(mae_list) # Plot confidence-error curves # 1e-14 instead of 0 to for numerical reasons! - confidence_percentiles = np.arange(1e-14, 100, 100/len(y_test)) + confidence_percentiles = np.arange(1e-14, 100, 100 / len(y_test)) # We plot the Mean-absolute error confidence-error curves @@ -273,28 +322,36 @@ def evaluate_model(initialize_model, n_trials, n_iters, test_set_size, X, y, fig lower = mae_mean - mae_std upper = mae_mean + mae_std - plt.plot(confidence_percentiles, mae_mean, label='mean') + plt.plot(confidence_percentiles, mae_mean, label="mean") plt.fill_between(confidence_percentiles, lower, upper, alpha=0.2) - plt.xlabel('Confidence Percentile') - plt.ylabel('MAE (nm)') + plt.xlabel("Confidence Percentile") + plt.ylabel("MAE (nm)") plt.ylim([0, np.max(upper) + 1]) plt.xlim([0, 100 * ((len(y_test) - 1) / len(y_test))]) plt.yticks(np.arange(0, np.max(upper) + 1, 5.0)) plt.savefig(figure_path) - - return r2_list,rmse_list, mae_list, confidence_percentiles, mae_mean, mae_std - + return r2_list, rmse_list, mae_list, confidence_percentiles, mae_mean, mae_std -def evaluate_graph_model(initialize_model, X, y, test_set_size, n_trials, n_iters, figure_path='', kernel=WeisfeilerLehmanKernel, **kernel_kwargs) -> Tuple[np.ndarray, np.ndarray]: +def evaluate_graph_model( + initialize_model, + X, + y, + test_set_size, + n_trials, + n_iters, + figure_path="", + kernel=WeisfeilerLehmanKernel, + **kernel_kwargs, +) -> Tuple[np.ndarray, np.ndarray]: # Initialise performance metric lists r2_list = [] rmse_list = [] mae_list = [] warnings.filterwarnings("ignore") - warnings.filterwarnings('ignore', category=RuntimeWarning) + warnings.filterwarnings("ignore", category=RuntimeWarning) # We pre-allocate array for plotting confidence-error curves n_test = int(len(y) * test_set_size) + 1 @@ -320,7 +377,9 @@ def evaluate_graph_model(initialize_model, X, y, test_set_size, n_trials, n_iter # Initialise GP likelihood and model likelihood = GaussianLikelihood() - model = initialize_model(X_train, y_train, likelihood, kernel, node_label="element") + model = initialize_model( + X_train, y_train, likelihood, kernel, node_label="element" + ) # Define the marginal log likelihood used to optimise the model hyperparameters mll = ExactMarginalLogLikelihood(likelihood, model) diff --git a/experiments/kernels.py b/experiments/kernels.py index 0e027aa..8a3c581 100644 --- a/experiments/kernels.py +++ b/experiments/kernels.py @@ -9,6 +9,7 @@ from gpytorch.kernels import Kernel import torch + class TanimotoKernel(Kernel): is_stationary = False has_lengthscale = False @@ -16,17 +17,20 @@ class TanimotoKernel(Kernel): def __init__(self, **kwargs): super(TanimotoKernel, self).__init__(**kwargs) - def batch_tanimoto_sim(self, x1: torch.Tensor, x2: torch.Tensor, eps: float = 1e-6) -> torch.Tensor: + def batch_tanimoto_sim( + self, x1: torch.Tensor, x2: torch.Tensor, eps: float = 1e-6 + ) -> torch.Tensor: if x1.ndim < 2 or x2.ndim < 2: raise ValueError("Tensors must have a batch dimension") dot_prod = torch.matmul(x1, torch.transpose(x2, -1, -2)) - x1_norm = torch.sum(x1 ** 2, dim=-1, keepdims=True) - x2_norm = torch.sum(x2 ** 2, dim=-1, keepdims=True) + x1_norm = torch.sum(x1**2, dim=-1, keepdims=True) + x2_norm = torch.sum(x2**2, dim=-1, keepdims=True) tan_similarity = (dot_prod + eps) / ( - eps + x1_norm + torch.transpose(x2_norm, -1, -2) - dot_prod + eps + x1_norm + torch.transpose(x2_norm, -1, -2) - dot_prod ) - return tan_similarity.clamp_min_(0) # zero out negative values for numerical stability - + return tan_similarity.clamp_min_( + 0 + ) # zero out negative values for numerical stability def forward(self, x1, x2, diag=False, **params): if diag: @@ -37,17 +41,20 @@ def forward(self, x1, x2, diag=False, **params): else: return self.covar_dist(x1, x2, **params) - def covar_dist(self,x1,x2,last_dim_is_batch=False,**params): + def covar_dist(self, x1, x2, last_dim_is_batch=False, **params): if last_dim_is_batch: x1 = x1.transpose(-1, -2).unsqueeze(-1) x2 = x2.transpose(-1, -2).unsqueeze(-1) return self.batch_tanimoto_sim(x1, x2) + class WalkKernel(Kernel): def __init__(self, **kwargs): super(WalkKernel, self).__init__(**kwargs) - - def batch_walk_sim(self, x1: torch.Tensor, x2: torch.Tensor, eps: float = 1e-6) -> torch.Tensor: + + def batch_walk_sim( + self, x1: torch.Tensor, x2: torch.Tensor, eps: float = 1e-6 + ) -> torch.Tensor: return @@ -55,7 +62,9 @@ def batch_walk_sim(self, x1: torch.Tensor, x2: torch.Tensor, eps: float = 1e-6) class GraphKernel(Kernel): def __init__(self, **kwargs): super(GraphKernel, self).__init__(**kwargs) - - def batch_graph_sim(self, x1: torch.Tensor, x2: torch.Tensor, eps: float = 1e-6) -> torch.Tensor: - return \ No newline at end of file + def batch_graph_sim( + self, x1: torch.Tensor, x2: torch.Tensor, eps: float = 1e-6 + ) -> torch.Tensor: + + return diff --git a/experiments/lipophilicity_experiment.py b/experiments/lipophilicity_experiment.py index 11a0c4a..71ee58a 100644 --- a/experiments/lipophilicity_experiment.py +++ b/experiments/lipophilicity_experiment.py @@ -5,10 +5,10 @@ import numpy as np from load_process_data import LoadDatasetForTask -#botorch specific +# botorch specific from botorch.models.gp_regression import ExactGP -#gpytorch specific +# gpytorch specific from gpytorch.means import ConstantMean from gpytorch.kernels import ScaleKernel, RBFKernel from gpytorch.distributions import MultivariateNormal @@ -25,8 +25,8 @@ if torch.cuda.is_available(): dev = "cuda:0" torch.cuda.empty_cache() -else: - dev = "cpu" +else: + dev = "cpu" device = torch.device(dev) @@ -41,6 +41,7 @@ def forward(self, x): covar_x = self.covar_module(x) return MultivariateNormal(mean_x, covar_x) + class GraphGP(SIGP): def __init__(self, train_x, train_y, likelihood, kernel, **kernel_kwargs): super().__init__(train_x, train_y, likelihood) @@ -61,67 +62,119 @@ def forward(self, x): return MultivariateNormal(mean, covariance) -def initialize_model(train_x:torch.Tensor, train_obj:torch.Tensor, likelihood): +def initialize_model(train_x: torch.Tensor, train_obj: torch.Tensor, likelihood): model = ExactGPModel(train_x, train_obj, likelihood).to(train_x) return model + def initialize_graph_gp(train_x, train_obj, likelihood, kernel, **kernel_kwargs): - model = GraphGP(train_x, train_obj, likelihood, kernel, **kernel_kwargs) + model = GraphGP(train_x, train_obj, likelihood, kernel, **kernel_kwargs) return model + def one_experiment(target, encoding, n_trials, n_iters): - X,y = [], [] - if encoding == 'complexes': - X,y = LoadDatasetForTask(X='dataset/lipophilicity/fast_complex_lookup_repn.pkl', y='dataset/lipophilicity/Lipophilicity.csv', repn=encoding, y_column=target).load_lipophilicity() - elif encoding == 'deep_complexes': - X,y = LoadDatasetForTask(X='dataset/lipophilicity/deep_complex_lookup_repn.pkl',y='dataset/lipophilicity/Lipophilicity.csv',repn=encoding, y_column=target).load_lipophilicity() - elif ENCODING == 'fingerprints': - X,y = LoadDatasetForTask(X='gauche_ecfp', y='dataset/lipophilicity/Lipophilicity.csv', repn=encoding, y_column=target).load_lipophilicity() - elif ENCODING == 'SELFIES': - X,y = LoadDatasetForTask(X='gauche_selfies', y='dataset/lipophilicity/Lipophilicity.csv', repn=encoding, y_column=target).load_lipophilicity() - elif ENCODING == 'GRAPHS': - X,y = LoadDatasetForTask(X='gauche_graphs', y='dataset/lipophilicity/Lipophilicity.csv', repn=encoding, y_column=target).load_lipophilicity() - - if ENCODING != 'GRAPHS': - r2_list,rmse_list, mae_list, confidence_percentiles, mae_mean, mae_std = evaluate_model(initialize_model=initialize_model, n_trials=n_trials, n_iters=n_iters, test_set_size=holdout_set_size, X=X, y=y, figure_path=f'results/{EXPERIMENT_TYPE}/confidence_mae_model_{ENCODING}_{target}.png') + X, y = [], [] + if encoding == "complexes": + X, y = LoadDatasetForTask( + X="dataset/lipophilicity/fast_complex_lookup_repn.pkl", + y="dataset/lipophilicity/Lipophilicity.csv", + repn=encoding, + y_column=target, + ).load_lipophilicity() + elif encoding == "deep_complexes": + X, y = LoadDatasetForTask( + X="dataset/lipophilicity/deep_complex_lookup_repn.pkl", + y="dataset/lipophilicity/Lipophilicity.csv", + repn=encoding, + y_column=target, + ).load_lipophilicity() + elif ENCODING == "fingerprints": + X, y = LoadDatasetForTask( + X="gauche_ecfp", + y="dataset/lipophilicity/Lipophilicity.csv", + repn=encoding, + y_column=target, + ).load_lipophilicity() + elif ENCODING == "SELFIES": + X, y = LoadDatasetForTask( + X="gauche_selfies", + y="dataset/lipophilicity/Lipophilicity.csv", + repn=encoding, + y_column=target, + ).load_lipophilicity() + elif ENCODING == "GRAPHS": + X, y = LoadDatasetForTask( + X="gauche_graphs", + y="dataset/lipophilicity/Lipophilicity.csv", + repn=encoding, + y_column=target, + ).load_lipophilicity() + + if ENCODING != "GRAPHS": + r2_list, rmse_list, mae_list, confidence_percentiles, mae_mean, mae_std = ( + evaluate_model( + initialize_model=initialize_model, + n_trials=n_trials, + n_iters=n_iters, + test_set_size=holdout_set_size, + X=X, + y=y, + figure_path=f"results/{EXPERIMENT_TYPE}/confidence_mae_model_{ENCODING}_{target}.png", + ) + ) else: - r2_list,rmse_list, mae_list, confidence_percentiles, mae_mean, mae_std = evaluate_graph_model(initialize_graph_gp, n_trials=n_trials, n_iters=n_iters, test_set_size=holdout_set_size, X=X, y=y, figure_path=f'results/{EXPERIMENT_TYPE}/confidence_mae_model_{ENCODING}_{target}.png') - - mean_r2 = "\nmean R^2: {:.4f} +- {:.4f}".format(np.mean(r2_list), np.std(r2_list)/np.sqrt(len(r2_list))) - mean_rmse = "mean RMSE: {:.4f} +- {:.4f}".format(np.mean(rmse_list), np.std(rmse_list)/np.sqrt(len(rmse_list))) - mean_mae = "mean MAE: {:.4f} +- {:.4f}\n".format(np.mean(mae_list), np.std(mae_list)/np.sqrt(len(mae_list))) + r2_list, rmse_list, mae_list, confidence_percentiles, mae_mean, mae_std = ( + evaluate_graph_model( + initialize_graph_gp, + n_trials=n_trials, + n_iters=n_iters, + test_set_size=holdout_set_size, + X=X, + y=y, + figure_path=f"results/{EXPERIMENT_TYPE}/confidence_mae_model_{ENCODING}_{target}.png", + ) + ) + + mean_r2 = "\nmean R^2: {:.4f} +- {:.4f}".format( + np.mean(r2_list), np.std(r2_list) / np.sqrt(len(r2_list)) + ) + mean_rmse = "mean RMSE: {:.4f} +- {:.4f}".format( + np.mean(rmse_list), np.std(rmse_list) / np.sqrt(len(rmse_list)) + ) + mean_mae = "mean MAE: {:.4f} +- {:.4f}\n".format( + np.mean(mae_list), np.std(mae_list) / np.sqrt(len(mae_list)) + ) return mean_r2, mean_rmse, mean_mae -if __name__ == '__main__': - EXPERIMENT_TYPE = 'Lipophilicity' - ENCODING = 'GRAPHS' +if __name__ == "__main__": + EXPERIMENT_TYPE = "Lipophilicity" + ENCODING = "GRAPHS" N_TRIALS = 20 N_ITERS = 5 holdout_set_size = 0.33 # dataset processing - X,y = [], [] + X, y = [], [] # dataset loading - possible_target_cols = ['exp'] + possible_target_cols = ["exp"] results = [] - + for col in possible_target_cols: - mean_r2, mean_rmse, mean_mae = one_experiment(col, ENCODING,N_TRIALS,N_ITERS) + mean_r2, mean_rmse, mean_mae = one_experiment(col, ENCODING, N_TRIALS, N_ITERS) results.append([col, mean_r2, mean_rmse, mean_mae]) - if type(EXPERIMENT_TYPE) is str: - trial_num = len(os.listdir(f'results/{EXPERIMENT_TYPE}')) + trial_num = len(os.listdir(f"results/{EXPERIMENT_TYPE}")) results_path = f"results/{EXPERIMENT_TYPE}/{ENCODING}_{time.time()}.txt" - with open(results_path, 'w') as f: - f.write(EXPERIMENT_TYPE + ':') - f.write('\n') - f.write(ENCODING + ':') + with open(results_path, "w") as f: + f.write(EXPERIMENT_TYPE + ":") + f.write("\n") + f.write(ENCODING + ":") for result in results: col, mean_r2, mean_rmse, mean_mae = result - f.write(f'column: {col}, {mean_r2}, {mean_rmse}, {mean_mae}') - f.write('\n') + f.write(f"column: {col}, {mean_r2}, {mean_rmse}, {mean_mae}") + f.write("\n") f.close() - print("CONCLUDED") \ No newline at end of file + print("CONCLUDED") diff --git a/experiments/load_process_data.py b/experiments/load_process_data.py index 454c971..e95c94a 100644 --- a/experiments/load_process_data.py +++ b/experiments/load_process_data.py @@ -5,16 +5,17 @@ from typing import Tuple from gauche.dataloader import MolPropLoader -class LoadDatasetForTask(): + +class LoadDatasetForTask: def __init__(self, X, y, y_column, repn): self.X = X self.y = y self.y_column = y_column self.repn = repn - + def load_photoswitches(self) -> Tuple[torch.Tensor, torch.Tensor]: - if self.repn == 'complexes': - with open(self.X, 'rb') as f: + if self.repn == "complexes": + with open(self.X, "rb") as f: x_data = dill.load(f) X = [] for x in x_data: @@ -22,18 +23,27 @@ def load_photoswitches(self) -> Tuple[torch.Tensor, torch.Tensor]: t = torch.tensor(rep) X.append(t) max_len = max([x.squeeze().numel() for x in X]) - data = [torch.nn.functional.pad(x, pad=(0, max_len - x.numel()), mode='constant', value=0) for x in X] + data = [ + torch.nn.functional.pad( + x, pad=(0, max_len - x.numel()), mode="constant", value=0 + ) + for x in X + ] X = torch.stack(data) ydata = pd.read_csv(self.y) y = ydata[self.y_column] mean_value = y.mean() y.fillna(value=mean_value, inplace=True) y = torch.tensor(y.values).view(len(y), 1) - assert len(X) == len(y) and isinstance(X, torch.Tensor) and isinstance(y, torch.Tensor) + assert ( + len(X) == len(y) + and isinstance(X, torch.Tensor) + and isinstance(y, torch.Tensor) + ) return tuple([X, y]) - elif self.repn == 'deep_complexes': - print(f'here') - with open(self.X, 'rb') as f: + elif self.repn == "deep_complexes": + print(f"here") + with open(self.X, "rb") as f: x_data = dill.load(f) X = [] for x in x_data: @@ -41,24 +51,35 @@ def load_photoswitches(self) -> Tuple[torch.Tensor, torch.Tensor]: rep1 = x_data[x][1] rep0.flatten() rep1.flatten() - r = np.concatenate([rep0,rep1], axis=0) + r = np.concatenate([rep0, rep1], axis=0) t = torch.tensor(r) X.append(t) max_len = max([x.squeeze().numel() for x in X]) - data = [torch.nn.functional.pad(x, pad=(0, max_len - x.numel()), mode='constant', value=0) for x in X] + data = [ + torch.nn.functional.pad( + x, pad=(0, max_len - x.numel()), mode="constant", value=0 + ) + for x in X + ] X = torch.stack(data) ydata = pd.read_csv(self.y) y = ydata[self.y_column] mean_value = y.mean() y.fillna(value=mean_value, inplace=True) y = torch.tensor(y.values).view(len(y), 1) - assert len(X) == len(y) and isinstance(X, torch.Tensor) and isinstance(y, torch.Tensor) + assert ( + len(X) == len(y) + and isinstance(X, torch.Tensor) + and isinstance(y, torch.Tensor) + ) return tuple([X, y]) - elif self.repn == 'fingerprints': + elif self.repn == "fingerprints": loader = MolPropLoader() loader.validate = lambda: False - loader.load_benchmark("Photoswitch", path='dataset/photoswitches/photoswitches.csv') - loader.featurize('ecfp_fragprints') + loader.load_benchmark( + "Photoswitch", path="dataset/photoswitches/photoswitches.csv" + ) + loader.featurize("ecfp_fragprints") X = loader.features X = torch.from_numpy(X).type(torch.float64) ydata = pd.read_csv(self.y) @@ -66,13 +87,19 @@ def load_photoswitches(self) -> Tuple[torch.Tensor, torch.Tensor]: mean_value = y.mean() y.fillna(value=mean_value, inplace=True) y = torch.tensor(y.values).view(len(y), 1) - assert len(X) == len(y) and isinstance(X, torch.Tensor) and isinstance(y, torch.Tensor) + assert ( + len(X) == len(y) + and isinstance(X, torch.Tensor) + and isinstance(y, torch.Tensor) + ) return tuple([X, y]) - elif self.repn == 'SELFIES': + elif self.repn == "SELFIES": loader = MolPropLoader() loader.validate = lambda: False - loader.load_benchmark("Photoswitch", path='dataset/photoswitches/photoswitches.csv') - loader.featurize('bag_of_selfies') + loader.load_benchmark( + "Photoswitch", path="dataset/photoswitches/photoswitches.csv" + ) + loader.featurize("bag_of_selfies") X = loader.features X = torch.from_numpy(X).type(torch.float64) ydata = pd.read_csv(self.y) @@ -80,13 +107,19 @@ def load_photoswitches(self) -> Tuple[torch.Tensor, torch.Tensor]: mean_value = y.mean() y.fillna(value=mean_value, inplace=True) y = torch.tensor(y.values).view(len(y), 1) - assert len(X) == len(y) and isinstance(X, torch.Tensor) and isinstance(y, torch.Tensor) + assert ( + len(X) == len(y) + and isinstance(X, torch.Tensor) + and isinstance(y, torch.Tensor) + ) return tuple([X, y]) - elif self.repn == 'GRAPHS': + elif self.repn == "GRAPHS": loader = MolPropLoader() loader.validate = lambda: False - loader.load_benchmark("Photoswitch", path='dataset/photoswitches/photoswitches.csv') - loader.featurize('molecular_graphs') + loader.load_benchmark( + "Photoswitch", path="dataset/photoswitches/photoswitches.csv" + ) + loader.featurize("molecular_graphs") X = loader.features ydata = pd.read_csv(self.y) y = ydata[self.y_column] @@ -95,10 +128,10 @@ def load_photoswitches(self) -> Tuple[torch.Tensor, torch.Tensor]: y = torch.tensor(y.values).view(len(y), 1) assert len(X) == len(y) and isinstance(y, torch.Tensor) return tuple([X, y]) - + def load_esol(self) -> Tuple[torch.Tensor, torch.Tensor]: - if self.repn == 'complexes': - with open(self.X, 'rb') as f: + if self.repn == "complexes": + with open(self.X, "rb") as f: x_data = dill.load(f) X = [] for x in x_data: @@ -106,16 +139,25 @@ def load_esol(self) -> Tuple[torch.Tensor, torch.Tensor]: t = torch.tensor(rep) X.append(t) max_len = max([x.squeeze().numel() for x in X]) - data = [torch.nn.functional.pad(x, pad=(0, max_len - x.numel()), mode='constant', value=0) for x in X] + data = [ + torch.nn.functional.pad( + x, pad=(0, max_len - x.numel()), mode="constant", value=0 + ) + for x in X + ] X = torch.stack(data) ydata = pd.read_csv(self.y) y = ydata[self.y_column] y = torch.tensor(y.values).view(len(y), 1) - assert len(X) == len(y) and isinstance(X, torch.Tensor) and isinstance(y, torch.Tensor) + assert ( + len(X) == len(y) + and isinstance(X, torch.Tensor) + and isinstance(y, torch.Tensor) + ) return tuple([X, y]) - elif self.repn == 'deep_complexes': - print(f'here') - with open(self.X, 'rb') as f: + elif self.repn == "deep_complexes": + print(f"here") + with open(self.X, "rb") as f: x_data = dill.load(f) X = [] for x in x_data: @@ -123,54 +165,70 @@ def load_esol(self) -> Tuple[torch.Tensor, torch.Tensor]: rep1 = x_data[x][1] rep0.flatten() rep1.flatten() - r = np.concatenate([rep0,rep1], axis=0) + r = np.concatenate([rep0, rep1], axis=0) t = torch.tensor(r) X.append(t) max_len = max([x.squeeze().numel() for x in X]) - data = [torch.nn.functional.pad(x, pad=(0, max_len - x.numel()), mode='constant', value=0) for x in X] + data = [ + torch.nn.functional.pad( + x, pad=(0, max_len - x.numel()), mode="constant", value=0 + ) + for x in X + ] X = torch.stack(data) ydata = pd.read_csv(self.y) y = ydata[self.y_column] y = torch.tensor(y.values).view(len(y), 1) - assert len(X) == len(y) and isinstance(X, torch.Tensor) and isinstance(y, torch.Tensor) + assert ( + len(X) == len(y) + and isinstance(X, torch.Tensor) + and isinstance(y, torch.Tensor) + ) return tuple([X, y]) - elif self.repn == 'fingerprints': + elif self.repn == "fingerprints": loader = MolPropLoader() - loader.load_benchmark("ESOL", path='dataset/esol/ESOL.csv') - loader.featurize('ecfp_fragprints') + loader.load_benchmark("ESOL", path="dataset/esol/ESOL.csv") + loader.featurize("ecfp_fragprints") X = loader.features X = torch.from_numpy(X).type(torch.float64) ydata = pd.read_csv(self.y) y = ydata[self.y_column] y = torch.tensor(y.values).view(len(y), 1) - assert len(X) == len(y) and isinstance(X, torch.Tensor) and isinstance(y, torch.Tensor) + assert ( + len(X) == len(y) + and isinstance(X, torch.Tensor) + and isinstance(y, torch.Tensor) + ) return tuple([X, y]) - elif self.repn == 'SELFIES': + elif self.repn == "SELFIES": loader = MolPropLoader() - loader.load_benchmark("ESOL", path='dataset/esol/ESOL.csv') - loader.featurize('bag_of_selfies') + loader.load_benchmark("ESOL", path="dataset/esol/ESOL.csv") + loader.featurize("bag_of_selfies") X = loader.features X = torch.from_numpy(X).type(torch.float64) ydata = pd.read_csv(self.y) y = ydata[self.y_column] y = torch.tensor(y.values).view(len(y), 1) - assert len(X) == len(y) and isinstance(X, torch.Tensor) and isinstance(y, torch.Tensor) + assert ( + len(X) == len(y) + and isinstance(X, torch.Tensor) + and isinstance(y, torch.Tensor) + ) return tuple([X, y]) - elif self.repn == 'GRAPHS': + elif self.repn == "GRAPHS": loader = MolPropLoader() - loader.load_benchmark("ESOL", path='dataset/esol/ESOL.csv') - loader.featurize('molecular_graphs') + loader.load_benchmark("ESOL", path="dataset/esol/ESOL.csv") + loader.featurize("molecular_graphs") X = loader.features ydata = pd.read_csv(self.y) y = ydata[self.y_column] y = torch.tensor(y.values).view(len(y), 1) assert len(X) == len(y) and isinstance(y, torch.Tensor) return tuple([X, y]) - def load_freesolv(self) -> Tuple[torch.Tensor, torch.Tensor]: - if self.repn == 'complexes': - with open(self.X, 'rb') as f: + if self.repn == "complexes": + with open(self.X, "rb") as f: x_data = dill.load(f) X = [] for x in x_data: @@ -178,16 +236,25 @@ def load_freesolv(self) -> Tuple[torch.Tensor, torch.Tensor]: t = torch.tensor(rep) X.append(t) max_len = max([x.squeeze().numel() for x in X]) - data = [torch.nn.functional.pad(x, pad=(0, max_len - x.numel()), mode='constant', value=0) for x in X] + data = [ + torch.nn.functional.pad( + x, pad=(0, max_len - x.numel()), mode="constant", value=0 + ) + for x in X + ] X = torch.stack(data) ydata = pd.read_csv(self.y) y = ydata[self.y_column] y = torch.tensor(y.values).view(len(y), 1) - assert len(X) == len(y) and isinstance(X, torch.Tensor) and isinstance(y, torch.Tensor) + assert ( + len(X) == len(y) + and isinstance(X, torch.Tensor) + and isinstance(y, torch.Tensor) + ) return tuple([X, y]) - elif self.repn == 'deep_complexes': - print(f'here') - with open(self.X, 'rb') as f: + elif self.repn == "deep_complexes": + print(f"here") + with open(self.X, "rb") as f: x_data = dill.load(f) X = [] for x in x_data: @@ -195,53 +262,70 @@ def load_freesolv(self) -> Tuple[torch.Tensor, torch.Tensor]: rep1 = x_data[x][1] rep0.flatten() rep1.flatten() - r = np.concatenate([rep0,rep1], axis=0) + r = np.concatenate([rep0, rep1], axis=0) t = torch.tensor(r) X.append(t) max_len = max([x.squeeze().numel() for x in X]) - data = [torch.nn.functional.pad(x, pad=(0, max_len - x.numel()), mode='constant', value=0) for x in X] + data = [ + torch.nn.functional.pad( + x, pad=(0, max_len - x.numel()), mode="constant", value=0 + ) + for x in X + ] X = torch.stack(data) ydata = pd.read_csv(self.y) y = ydata[self.y_column] y = torch.tensor(y.values).view(len(y), 1) - assert len(X) == len(y) and isinstance(X, torch.Tensor) and isinstance(y, torch.Tensor) + assert ( + len(X) == len(y) + and isinstance(X, torch.Tensor) + and isinstance(y, torch.Tensor) + ) return tuple([X, y]) - elif self.repn == 'fingerprints': + elif self.repn == "fingerprints": loader = MolPropLoader() - loader.load_benchmark("FreeSolv", path='dataset/free_solv/FreeSolv.csv') - loader.featurize('ecfp_fragprints') + loader.load_benchmark("FreeSolv", path="dataset/free_solv/FreeSolv.csv") + loader.featurize("ecfp_fragprints") X = loader.features X = torch.from_numpy(X).type(torch.float64) ydata = pd.read_csv(self.y) y = ydata[self.y_column] y = torch.tensor(y.values).view(len(y), 1) - assert len(X) == len(y) and isinstance(X, torch.Tensor) and isinstance(y, torch.Tensor) + assert ( + len(X) == len(y) + and isinstance(X, torch.Tensor) + and isinstance(y, torch.Tensor) + ) return tuple([X, y]) - elif self.repn == 'SELFIES': + elif self.repn == "SELFIES": loader = MolPropLoader() - loader.load_benchmark("FreeSolv", path='dataset/free_solv/FreeSolv.csv') - loader.featurize('bag_of_selfies') + loader.load_benchmark("FreeSolv", path="dataset/free_solv/FreeSolv.csv") + loader.featurize("bag_of_selfies") X = loader.features X = torch.from_numpy(X).type(torch.float64) ydata = pd.read_csv(self.y) y = ydata[self.y_column] y = torch.tensor(y.values).view(len(y), 1) - assert len(X) == len(y) and isinstance(X, torch.Tensor) and isinstance(y, torch.Tensor) + assert ( + len(X) == len(y) + and isinstance(X, torch.Tensor) + and isinstance(y, torch.Tensor) + ) return tuple([X, y]) - elif self.repn == 'GRAPHS': + elif self.repn == "GRAPHS": loader = MolPropLoader() - loader.load_benchmark("FreeSolv", path='dataset/free_solv/FreeSolv.csv') - loader.featurize('molecular_graphs') + loader.load_benchmark("FreeSolv", path="dataset/free_solv/FreeSolv.csv") + loader.featurize("molecular_graphs") X = loader.features ydata = pd.read_csv(self.y) y = ydata[self.y_column] y = torch.tensor(y.values).view(len(y), 1) assert len(X) == len(y) and isinstance(y, torch.Tensor) return tuple([X, y]) - + def load_lipophilicity(self) -> Tuple[torch.Tensor, torch.Tensor]: - if self.repn == 'complexes': - with open(self.X, 'rb') as f: + if self.repn == "complexes": + with open(self.X, "rb") as f: x_data = dill.load(f) X = [] for x in x_data: @@ -249,16 +333,25 @@ def load_lipophilicity(self) -> Tuple[torch.Tensor, torch.Tensor]: t = torch.tensor(rep) X.append(t) max_len = max([x.squeeze().numel() for x in X]) - data = [torch.nn.functional.pad(x, pad=(0, max_len - x.numel()), mode='constant', value=0) for x in X] + data = [ + torch.nn.functional.pad( + x, pad=(0, max_len - x.numel()), mode="constant", value=0 + ) + for x in X + ] X = torch.stack(data) ydata = pd.read_csv(self.y) y = ydata[self.y_column] y = torch.tensor(y.values).view(len(y), 1) - assert len(X) == len(y) and isinstance(X, torch.Tensor) and isinstance(y, torch.Tensor) + assert ( + len(X) == len(y) + and isinstance(X, torch.Tensor) + and isinstance(y, torch.Tensor) + ) return tuple([X, y]) - elif self.repn == 'deep_complexes': - print(f'here') - with open(self.X, 'rb') as f: + elif self.repn == "deep_complexes": + print(f"here") + with open(self.X, "rb") as f: x_data = dill.load(f) X = [] for x in x_data: @@ -266,46 +359,69 @@ def load_lipophilicity(self) -> Tuple[torch.Tensor, torch.Tensor]: rep1 = x_data[x][1] rep0.flatten() rep1.flatten() - r = np.concatenate([rep0,rep1], axis=0) + r = np.concatenate([rep0, rep1], axis=0) t = torch.tensor(r) X.append(t) max_len = max([x.squeeze().numel() for x in X]) - data = [torch.nn.functional.pad(x, pad=(0, max_len - x.numel()), mode='constant', value=0) for x in X] + data = [ + torch.nn.functional.pad( + x, pad=(0, max_len - x.numel()), mode="constant", value=0 + ) + for x in X + ] X = torch.stack(data) ydata = pd.read_csv(self.y) y = ydata[self.y_column] y = torch.tensor(y.values).view(len(y), 1) - assert len(X) == len(y) and isinstance(X, torch.Tensor) and isinstance(y, torch.Tensor) + assert ( + len(X) == len(y) + and isinstance(X, torch.Tensor) + and isinstance(y, torch.Tensor) + ) return tuple([X, y]) - elif self.repn == 'fingerprints': + elif self.repn == "fingerprints": loader = MolPropLoader() - loader.load_benchmark("Lipophilicity", path='dataset/lipophilicity/Lipophilicity.csv') - loader.featurize('ecfp_fragprints') + loader.load_benchmark( + "Lipophilicity", path="dataset/lipophilicity/Lipophilicity.csv" + ) + loader.featurize("ecfp_fragprints") X = loader.features X = torch.from_numpy(X).type(torch.float64) ydata = pd.read_csv(self.y) y = ydata[self.y_column] y = torch.tensor(y.values).view(len(y), 1) - assert len(X) == len(y) and isinstance(X, torch.Tensor) and isinstance(y, torch.Tensor) + assert ( + len(X) == len(y) + and isinstance(X, torch.Tensor) + and isinstance(y, torch.Tensor) + ) return tuple([X, y]) - elif self.repn == 'SELFIES': + elif self.repn == "SELFIES": loader = MolPropLoader() - loader.load_benchmark("Lipophilicity", path='dataset/lipophilicity/Lipophilicity.csv') - loader.featurize('bag_of_selfies') + loader.load_benchmark( + "Lipophilicity", path="dataset/lipophilicity/Lipophilicity.csv" + ) + loader.featurize("bag_of_selfies") X = loader.features X = torch.from_numpy(X).type(torch.float64) ydata = pd.read_csv(self.y) y = ydata[self.y_column] y = torch.tensor(y.values).view(len(y), 1) - assert len(X) == len(y) and isinstance(X, torch.Tensor) and isinstance(y, torch.Tensor) + assert ( + len(X) == len(y) + and isinstance(X, torch.Tensor) + and isinstance(y, torch.Tensor) + ) return tuple([X, y]) - elif self.repn == 'GRAPHS': + elif self.repn == "GRAPHS": loader = MolPropLoader() - loader.load_benchmark("Lipophilicity", path='dataset/lipophilicity/Lipophilicity.csv') - loader.featurize('molecular_graphs') + loader.load_benchmark( + "Lipophilicity", path="dataset/lipophilicity/Lipophilicity.csv" + ) + loader.featurize("molecular_graphs") X = loader.features ydata = pd.read_csv(self.y) y = ydata[self.y_column] y = torch.tensor(y.values).view(len(y), 1) assert len(X) == len(y) and isinstance(y, torch.Tensor) - return tuple([X, y]) \ No newline at end of file + return tuple([X, y]) diff --git a/experiments/photoswitches_experiment.py b/experiments/photoswitches_experiment.py index 6505fbc..96fac08 100644 --- a/experiments/photoswitches_experiment.py +++ b/experiments/photoswitches_experiment.py @@ -5,10 +5,10 @@ import numpy as np from load_process_data import LoadDatasetForTask -#botorch specific +# botorch specific from botorch.models.gp_regression import ExactGP -#gpytorch specific +# gpytorch specific from gpytorch.means import ConstantMean from gpytorch.kernels import ScaleKernel, RBFKernel from gpytorch.distributions import MultivariateNormal @@ -25,8 +25,8 @@ if torch.cuda.is_available(): dev = "cuda:0" torch.cuda.empty_cache() -else: - dev = "cpu" +else: + dev = "cpu" device = torch.device(dev) @@ -41,6 +41,7 @@ def forward(self, x): covar_x = self.covar_module(x) return MultivariateNormal(mean_x, covar_x) + class GraphGP(SIGP): def __init__(self, train_x, train_y, likelihood, kernel, **kernel_kwargs): super().__init__(train_x, train_y, likelihood) @@ -61,69 +62,136 @@ def forward(self, x): return MultivariateNormal(mean, covariance) -def initialize_model(train_x:torch.Tensor, train_obj:torch.Tensor, likelihood): +def initialize_model(train_x: torch.Tensor, train_obj: torch.Tensor, likelihood): model = ExactGPModel(train_x, train_obj, likelihood).to(train_x) return model + def initialize_graph_gp(train_x, train_obj, likelihood, kernel, **kernel_kwargs): - model = GraphGP(train_x, train_obj, likelihood, kernel, **kernel_kwargs) + model = GraphGP(train_x, train_obj, likelihood, kernel, **kernel_kwargs) return model + def one_experiment(target, encoding, n_trials, n_iters): - X,y = [], [] - if encoding == 'complexes': - X,y = LoadDatasetForTask(X='dataset/photoswitches/fast_complex_lookup_repn.pkl', y='dataset/photoswitches/photoswitches.csv', repn=encoding, y_column=target).load_photoswitches() - elif encoding == 'deep_complexes': - X,y = LoadDatasetForTask(X='dataset/photoswitches/deep_complex_lookup_repn.pkl',y='dataset/photoswitches/photoswitches.csv',repn=encoding, y_column=target).load_photoswitches() - elif ENCODING == 'fingerprints': - X,y = LoadDatasetForTask(X='gauche_ecfp', y='dataset/photoswitches/photoswitches.csv', repn=encoding, y_column=target).load_photoswitches() - elif ENCODING == 'SELFIES': - X,y = LoadDatasetForTask(X='gauche_selfies', y='dataset/photoswitches/photoswitches.csv', repn=encoding, y_column=target).load_photoswitches() - elif ENCODING == 'GRAPHS': - X,y = LoadDatasetForTask(X='gauche_graphs', y='dataset/photoswitches/photoswitches.csv', repn=encoding, y_column=target).load_photoswitches() - - if ENCODING != 'GRAPHS': - r2_list,rmse_list, mae_list, confidence_percentiles, mae_mean, mae_std = evaluate_model(initialize_model=initialize_model, n_trials=n_trials, n_iters=n_iters, test_set_size=holdout_set_size, X=X, y=y, figure_path=f'results/{EXPERIMENT_TYPE}/confidence_mae_model_{ENCODING}_{target}.png') + X, y = [], [] + if encoding == "complexes": + X, y = LoadDatasetForTask( + X="dataset/photoswitches/fast_complex_lookup_repn.pkl", + y="dataset/photoswitches/photoswitches.csv", + repn=encoding, + y_column=target, + ).load_photoswitches() + elif encoding == "deep_complexes": + X, y = LoadDatasetForTask( + X="dataset/photoswitches/deep_complex_lookup_repn.pkl", + y="dataset/photoswitches/photoswitches.csv", + repn=encoding, + y_column=target, + ).load_photoswitches() + elif ENCODING == "fingerprints": + X, y = LoadDatasetForTask( + X="gauche_ecfp", + y="dataset/photoswitches/photoswitches.csv", + repn=encoding, + y_column=target, + ).load_photoswitches() + elif ENCODING == "SELFIES": + X, y = LoadDatasetForTask( + X="gauche_selfies", + y="dataset/photoswitches/photoswitches.csv", + repn=encoding, + y_column=target, + ).load_photoswitches() + elif ENCODING == "GRAPHS": + X, y = LoadDatasetForTask( + X="gauche_graphs", + y="dataset/photoswitches/photoswitches.csv", + repn=encoding, + y_column=target, + ).load_photoswitches() + + if ENCODING != "GRAPHS": + r2_list, rmse_list, mae_list, confidence_percentiles, mae_mean, mae_std = ( + evaluate_model( + initialize_model=initialize_model, + n_trials=n_trials, + n_iters=n_iters, + test_set_size=holdout_set_size, + X=X, + y=y, + figure_path=f"results/{EXPERIMENT_TYPE}/confidence_mae_model_{ENCODING}_{target}.png", + ) + ) else: - r2_list,rmse_list, mae_list, confidence_percentiles, mae_mean, mae_std = evaluate_graph_model(initialize_graph_gp, n_trials=n_trials, n_iters=n_iters, test_set_size=holdout_set_size, X=X, y=y, figure_path=f'results/{EXPERIMENT_TYPE}/confidence_mae_model_{ENCODING}_{target}.png') - - mean_r2 = "\nmean R^2: {:.4f} +- {:.4f}".format(np.mean(r2_list), np.std(r2_list)/np.sqrt(len(r2_list))) - mean_rmse = "mean RMSE: {:.4f} +- {:.4f}".format(np.mean(rmse_list), np.std(rmse_list)/np.sqrt(len(rmse_list))) - mean_mae = "mean MAE: {:.4f} +- {:.4f}\n".format(np.mean(mae_list), np.std(mae_list)/np.sqrt(len(mae_list))) + r2_list, rmse_list, mae_list, confidence_percentiles, mae_mean, mae_std = ( + evaluate_graph_model( + initialize_graph_gp, + n_trials=n_trials, + n_iters=n_iters, + test_set_size=holdout_set_size, + X=X, + y=y, + figure_path=f"results/{EXPERIMENT_TYPE}/confidence_mae_model_{ENCODING}_{target}.png", + ) + ) + + mean_r2 = "\nmean R^2: {:.4f} +- {:.4f}".format( + np.mean(r2_list), np.std(r2_list) / np.sqrt(len(r2_list)) + ) + mean_rmse = "mean RMSE: {:.4f} +- {:.4f}".format( + np.mean(rmse_list), np.std(rmse_list) / np.sqrt(len(rmse_list)) + ) + mean_mae = "mean MAE: {:.4f} +- {:.4f}\n".format( + np.mean(mae_list), np.std(mae_list) / np.sqrt(len(mae_list)) + ) return mean_r2, mean_rmse, mean_mae -if __name__ == '__main__': - EXPERIMENT_TYPE = 'Photoswitches' - ENCODING = 'deep_complexes' +if __name__ == "__main__": + EXPERIMENT_TYPE = "Photoswitches" + ENCODING = "deep_complexes" N_TRIALS = 20 N_ITERS = 5 holdout_set_size = 0.33 # dataset processing - X,y = [], [] + X, y = [], [] # dataset loading - possible_target_cols = ['rate of thermal isomerisation from Z-E in s-1','Z PhotoStationaryState','E PhotoStationaryState','E isomer pi-pi* wavelength in nm','Extinction','E isomer n-pi* wavelength in nm', - 'Extinction coefficient in M-1 cm-1','Z isomer pi-pi* wavelength in nm','Extinction.1','Z isomer n-pi* wavelength in nm','Extinction coefficient in M-1 cm-1.1','Wiberg index', - 'PBE0 DFT E isomer pi-pi* wavelength in nm','PBE0 DFT E isomer n-pi* wavelength in nm','PBE0 DFT Z isomer pi-pi* wavelength in nm','PBE0 DFT Z isomer n-pi* wavelength in nm'] + possible_target_cols = [ + "rate of thermal isomerisation from Z-E in s-1", + "Z PhotoStationaryState", + "E PhotoStationaryState", + "E isomer pi-pi* wavelength in nm", + "Extinction", + "E isomer n-pi* wavelength in nm", + "Extinction coefficient in M-1 cm-1", + "Z isomer pi-pi* wavelength in nm", + "Extinction.1", + "Z isomer n-pi* wavelength in nm", + "Extinction coefficient in M-1 cm-1.1", + "Wiberg index", + "PBE0 DFT E isomer pi-pi* wavelength in nm", + "PBE0 DFT E isomer n-pi* wavelength in nm", + "PBE0 DFT Z isomer pi-pi* wavelength in nm", + "PBE0 DFT Z isomer n-pi* wavelength in nm", + ] results = [] - + for col in possible_target_cols: - mean_r2, mean_rmse, mean_mae = one_experiment(col, ENCODING,N_TRIALS,N_ITERS) + mean_r2, mean_rmse, mean_mae = one_experiment(col, ENCODING, N_TRIALS, N_ITERS) results.append([col, mean_r2, mean_rmse, mean_mae]) - if type(EXPERIMENT_TYPE) is str: - trial_num = len(os.listdir(f'results/{EXPERIMENT_TYPE}')) + trial_num = len(os.listdir(f"results/{EXPERIMENT_TYPE}")) results_path = f"results/{EXPERIMENT_TYPE}/{ENCODING}_{time.time()}.txt" - with open(results_path, 'w') as f: - f.write(EXPERIMENT_TYPE + ':') - f.write('\n') - f.write(ENCODING + ':') + with open(results_path, "w") as f: + f.write(EXPERIMENT_TYPE + ":") + f.write("\n") + f.write(ENCODING + ":") for result in results: col, mean_r2, mean_rmse, mean_mae = result - f.write(f'column: {col}, {mean_r2}, {mean_rmse}, {mean_mae}') - f.write('\n') + f.write(f"column: {col}, {mean_r2}, {mean_rmse}, {mean_mae}") + f.write("\n") f.close() - print("CONCLUDED") \ No newline at end of file + print("CONCLUDED") diff --git a/src/atomic_complex.py b/src/atomic_complex.py index b562156..f40f167 100644 --- a/src/atomic_complex.py +++ b/src/atomic_complex.py @@ -5,40 +5,52 @@ from general_utils import GeneralComplexUtils -seed = np.random.randint(0,10*3) +seed = np.random.randint(0, 10 * 3) fm = np.float64(1e-15) key = jax.random.PRNGKey(seed) -class AtomComplex(): - def __init__(self, protons, neutrons, electrons, cutoff, proton_dims, neutron_dims, electron_dims, force_field=None, radial_contrib=None): + +class AtomComplex: + def __init__( + self, + protons, + neutrons, + electrons, + cutoff, + proton_dims, + neutron_dims, + electron_dims, + force_field=None, + radial_contrib=None, + ): self.P = protons self.N = neutrons self.E = electrons - assert isinstance(protons,int) and protons >= 0 - assert isinstance(neutrons,int) and neutrons >= 0 - assert isinstance(electrons,int) and electrons >= 0 + assert isinstance(protons, int) and protons >= 0 + assert isinstance(neutrons, int) and neutrons >= 0 + assert isinstance(electrons, int) and electrons >= 0 assert max(self.P, self.N) >= 1 assert min([self.P, self.N, self.E]) >= 0 - assert isinstance(proton_dims,list) or isinstance(proton_dims,int) - assert isinstance(neutron_dims,list) or isinstance(neutron_dims,int) - assert isinstance(electron_dims,list) or isinstance(electron_dims,int) - if isinstance(proton_dims,int): + assert isinstance(proton_dims, list) or isinstance(proton_dims, int) + assert isinstance(neutron_dims, list) or isinstance(neutron_dims, int) + assert isinstance(electron_dims, list) or isinstance(electron_dims, int) + if isinstance(proton_dims, int): assert proton_dims > 0 else: assert len(proton_dims) == 2 and min(proton_dims) >= 0 - if isinstance(neutron_dims,int): + if isinstance(neutron_dims, int): assert neutron_dims > 0 else: assert len(neutron_dims) == 2 and min(neutron_dims) >= 0 - assert isinstance(electron_dims,int) and electron_dims >= 0 - assert isinstance(cutoff,int) + assert isinstance(electron_dims, int) and electron_dims >= 0 + assert isinstance(cutoff, int) self.d_p = proton_dims self.d_n = neutron_dims self.d_e = electron_dims self.cutoff = cutoff self.DF = force_field self.DE = radial_contrib - + def custom_pad(self, arr, amt): new = [] for a in arr[0]: @@ -50,44 +62,44 @@ def custom_pad(self, arr, amt): def build_protons(self) -> List: self.AP = [] - if isinstance(self.d_p,int): + if isinstance(self.d_p, int): protons = Proton(self.d_p, self.P).build_proton() self.AP = list(protons) else: - l,r = self.d_p - for i in range(l,r+1): - protons = Proton(i, self.P).build_proton() - protons = self.custom_pad(protons, r-i) + l, r = self.d_p + for i in range(l, r + 1): + protons = Proton(i, self.P).build_proton() + protons = self.custom_pad(protons, r - i) self.AP += list(protons) return self.AP def build_neutrons(self) -> List: self.AN = [] - if isinstance(self.d_n,int): + if isinstance(self.d_n, int): neutrons = Neutron(self.d_n, self.N).build_neutron() self.AN = list(neutrons) else: - l,r = self.d_n - for i in range(l,r+1): - neutrons = Neutron(i, self.N).build_neutron() - neutrons = self.custom_pad(neutrons, r-i) + l, r = self.d_n + for i in range(l, r + 1): + neutrons = Neutron(i, self.N).build_neutron() + neutrons = self.custom_pad(neutrons, r - i) self.AN += list(neutrons) return self.AN - + def build_electrons(self) -> List: self.AE = [] - if isinstance(self.d_e,int): - electrons = Electron(self.d_e, self.E).build_electron() + if isinstance(self.d_e, int): + electrons = Electron(self.d_e, self.E).build_electron() self.AE = list(electrons) else: - l,r = self.d_e - for i in range(l,r+1): - electrons = Electron(i, self.E).build_electron() - electrons = self.custom_pad(electrons, r-i) + l, r = self.d_e + for i in range(l, r + 1): + electrons = Electron(i, self.E).build_electron() + electrons = self.custom_pad(electrons, r - i) self.AE += list(electrons) self.AE = [self.AE] return self.AE - + def fast_build_complex(self, using_distances=False, update_distances=None) -> Tuple: self.build_protons() self.build_neutrons() @@ -108,22 +120,23 @@ def fast_build_complex(self, using_distances=False, update_distances=None) -> Tu E = [] while self.AE: e, we = self.AE.pop() - if using_distances and hasattr(update_distances, '__call__'): + if using_distances and hasattr(update_distances, "__call__"): update_distances(self.DE, we, e) E.append(e) - + P = np.asarray(P) N = np.asarray(N) E = np.asarray(E) K = np.union1d(P, N) K = np.union1d(K, E) return tuple([K, self.AE, self.DF, self.DE]) - - def general_build_complex(self, using_distances=False, update_distances=None) -> Tuple: + def general_build_complex( + self, using_distances=False, update_distances=None + ) -> Tuple: if isinstance(self.d_p, list) or isinstance(self.d_n, list): - print('this will take too long - stopping early') - raise AssertionError('invalid input') + print("this will take too long - stopping early") + raise AssertionError("invalid input") gcu = GeneralComplexUtils(self.cutoff) self.build_protons() self.build_neutrons() @@ -133,7 +146,9 @@ def general_build_complex(self, using_distances=False, update_distances=None) -> while self.AP: p = self.AP.pop() p = np.asarray(p, dtype=np.float32) - res = gcu.get_nsphere_and_sampled_boundary(radius=1*fm, center=p[0], sample=10000, eps=1e-19) + res = gcu.get_nsphere_and_sampled_boundary( + radius=1 * fm, center=p[0], sample=10000, eps=1e-19 + ) nsphere, boundary = res n_sphere = np.asarray(nsphere) gluing_map = gcu.nuclear_force_map(boundary, boundary, K) @@ -142,7 +157,9 @@ def general_build_complex(self, using_distances=False, update_distances=None) -> while self.AN: n = self.AN.pop() n = np.asarray(n, dtype=np.float32) - res = gcu.get_nsphere_and_sampled_boundary(radius=0.8*fm, center=n[0], sample=10000, eps=1e-19) + res = gcu.get_nsphere_and_sampled_boundary( + radius=0.8 * fm, center=n[0], sample=10000, eps=1e-19 + ) nsphere, boundary = res n_sphere = np.asarray(nsphere) gluing_map = gcu.nuclear_force_map(boundary, boundary, K) @@ -150,42 +167,48 @@ def general_build_complex(self, using_distances=False, update_distances=None) -> while self.AE: e, we = self.AE.pop() - if using_distances and hasattr(update_distances, '__call__'): + if using_distances and hasattr(update_distances, "__call__"): update_distances(self.DE, we, e) ec = np.array([e[0] for _ in range(len(n[0]))]) - res = gcu.get_nsphere_and_sampled_boundary(radius=2.8*fm, center=ec, sample=10000, eps=1e-19) + res = gcu.get_nsphere_and_sampled_boundary( + radius=2.8 * fm, center=ec, sample=10000, eps=1e-19 + ) nsphere, boundary = res - unpack =[j[0] for i in nsphere for j in i if isinstance(j,np.ndarray)] + [nsphere[-1][-1]] + unpack = [j[0] for i in nsphere for j in i if isinstance(j, np.ndarray)] + [ + nsphere[-1][-1] + ] n_sphere = np.asarray(unpack) gluing_map = gcu.nuclear_force_map(boundary, boundary, K) K = gcu.topological_disjoint_union(gluing_map, K, n_sphere) return tuple([K, self.AE, self.DF, self.DE]) + def sanity_test(kind, *args): p, n, e, c, pd, nd, ed = args ac = AtomComplex(p, n, e, c, pd, nd, ed) - if kind == 'general': + if kind == "general": try: ac.general_build_complex() - print(f'success ✅') + print(f"success ✅") except: - print('failed ❌') + print("failed ❌") else: try: ac.fast_build_complex() - print(f'success ✅') + print(f"success ✅") except: - print('failed ❌') - -if __name__ == '__main__': - sanity_test('',1,1,1,5,3,3,0) - sanity_test('',2,1,2,5,3,3,0) - sanity_test('',1,0,1,1,[1,3],[1,6],9) - sanity_test('',0,12,12,5,[1,3],[1,3],0) - sanity_test('',1,12,0,5,[1,3],[1,3],0) - print('*'*10+'fast done'+'*'*10) - sanity_test('general',1,1,1,5,3,3,0) - sanity_test('general',2,1,2,5,3,7,0) - sanity_test('general',12,1,2,17,9,9,0) - print('*'*10+'general done'+'*'*10) \ No newline at end of file + print("failed ❌") + + +if __name__ == "__main__": + sanity_test("", 1, 1, 1, 5, 3, 3, 0) + sanity_test("", 2, 1, 2, 5, 3, 3, 0) + sanity_test("", 1, 0, 1, 1, [1, 3], [1, 6], 9) + sanity_test("", 0, 12, 12, 5, [1, 3], [1, 3], 0) + sanity_test("", 1, 12, 0, 5, [1, 3], [1, 3], 0) + print("*" * 10 + "fast done" + "*" * 10) + sanity_test("general", 1, 1, 1, 5, 3, 3, 0) + sanity_test("general", 2, 1, 2, 5, 3, 7, 0) + sanity_test("general", 12, 1, 2, 17, 9, 9, 0) + print("*" * 10 + "general done" + "*" * 10) diff --git a/src/build_atom_lookup.py b/src/build_atom_lookup.py index dc54f87..bf12fe3 100644 --- a/src/build_atom_lookup.py +++ b/src/build_atom_lookup.py @@ -5,50 +5,52 @@ from collections import defaultdict from atomic_complex import AtomComplex -class BuildAtoms(): + +class BuildAtoms: def __init__(self): - self.cwd = os.getcwd() - self.datapath = self.cwd + '/dataset/construct' - + self.cwd = os.getcwd() + self.datapath = self.cwd + "/dataset/construct" + def build_lookup_table(self) -> None: - assert 'lookup_map.json' in os.listdir(self.datapath) - with open(self.datapath+'/lookup_map.json') as data: + assert "lookup_map.json" in os.listdir(self.datapath) + with open(self.datapath + "/lookup_map.json") as data: d = json.load(data) - + lookup_table = defaultdict(list) for i, element in enumerate(d): n_protons, n_neutrons, n_electrons = d[element] ac = AtomComplex(n_protons, n_neutrons, n_electrons, 5, 3, 3, 0) complex = ac.fast_build_complex() - print(f'finished {i}') + print(f"finished {i}") lookup_table[element] = complex - - with open(self.datapath+'/atom_lookup.pkl', 'wb') as f: + + with open(self.datapath + "/atom_lookup.pkl", "wb") as f: dill.dump(lookup_table, f) - + return None - + def sanity(self): - lookup = self.datapath+'/atom_lookup.pkl' - with open(lookup, 'rb') as f: + lookup = self.datapath + "/atom_lookup.pkl" + with open(lookup, "rb") as f: table = dill.load(f) try: assert len(table.keys()) == 118 - print(table['He']) - print(table['Os']) - print(table['Bk']) - assert isinstance(table['He'], tuple) - assert isinstance(table['He'][0], np.ndarray) - assert isinstance(table['Os'], tuple) - assert isinstance(table['Os'][0], np.ndarray) - assert isinstance(table['Bk'], tuple) - assert isinstance(table['Bk'][0], np.ndarray) - print('Success ✅') + print(table["He"]) + print(table["Os"]) + print(table["Bk"]) + assert isinstance(table["He"], tuple) + assert isinstance(table["He"][0], np.ndarray) + assert isinstance(table["Os"], tuple) + assert isinstance(table["Os"][0], np.ndarray) + assert isinstance(table["Bk"], tuple) + assert isinstance(table["Bk"][0], np.ndarray) + print("Success ✅") except: - print('Failed ❌') + print("Failed ❌") + -if __name__ == '__main__': +if __name__ == "__main__": build = BuildAtoms() - #build.build_lookup_table() - build.sanity() \ No newline at end of file + # build.build_lookup_table() + build.sanity() diff --git a/src/building_blocks.py b/src/building_blocks.py index 545ccd9..3ae2ce5 100644 --- a/src/building_blocks.py +++ b/src/building_blocks.py @@ -3,27 +3,28 @@ import jax.numpy as jnp from typing import Tuple -seed = np.random.randint(0,10*3) +seed = np.random.randint(0, 10 * 3) fm = np.float64(1e-15) key = jax.random.PRNGKey(seed) -class Electron(): + +class Electron: def __init__(self, dim, num_pts=1): self.num_pts = num_pts - self.r = np.random.randint(0,5) - self.points = jax.random.normal(key*self.r, shape=(dim+1,num_pts)) - self.d = 1/(dim+1) + self.r = np.random.randint(0, 5) + self.points = jax.random.normal(key * self.r, shape=(dim + 1, num_pts)) + self.d = 1 / (dim + 1) self.cutoff = 2.8 * fm self.ee = [] self.w = [] - + def build_electron(self) -> Tuple[np.array, np.array]: for i in range(self.num_pts): v = self.points[:, i] length = jnp.linalg.norm(v) if length >= self.cutoff: r = np.random.rand() ** self.d - v = jnp.multiply(v,r*self.cutoff) + v = jnp.multiply(v, r * self.cutoff) self.ee.append(v) wv = self.wave_fn() self.w.append(wv) @@ -31,63 +32,73 @@ def build_electron(self) -> Tuple[np.array, np.array]: self.w = np.array(self.w) assert len(self.ee) == len(self.w) return tuple([self.ee, self.w]) - - def wave_fn(self,pos=0,mom=0,sigma=0.2): + + def wave_fn(self, pos=0, mom=0, sigma=0.2): randc = np.random.rand() - return lambda x: jnp.linalg.norm(np.exp(-1j*mom*x)*np.exp(-np.square(x-pos)/sigma/sigma,dtype=complex)) + randc + return ( + lambda x: jnp.linalg.norm( + np.exp(-1j * mom * x) + * np.exp(-np.square(x - pos) / sigma / sigma, dtype=complex) + ) + + randc + ) + -class Proton(): +class Proton: def __init__(self, dim, num_pts=1): self.dim = dim assert self.dim > 0 self.num_pts = num_pts - self.r = np.random.randint(0,100) - self.points = jax.random.normal(key*self.r, shape=(dim,num_pts)) # output shape determined by shape param ; key param is random seed state - self.d = 1/(dim) - self.cutoff = 1 * fm # hardcode cutoff + self.r = np.random.randint(0, 100) + self.points = jax.random.normal( + key * self.r, shape=(dim, num_pts) + ) # output shape determined by shape param ; key param is random seed state + self.d = 1 / (dim) + self.cutoff = 1 * fm # hardcode cutoff self.pd = [] - + def build_proton(self) -> Tuple[np.array]: for i in range(self.num_pts): v = self.points[:, i] length = jnp.linalg.norm(v) if length >= self.cutoff: r = np.random.rand() ** self.d - v = jnp.multiply(v,r*self.cutoff) + v = jnp.multiply(v, r * self.cutoff) self.pd.append(v) self.pd = np.unique(self.pd, axis=0) assert len(self.pd) == self.num_pts return tuple([self.pd]) -class Neutron(): + +class Neutron: def __init__(self, dim, num_pts=1): self.dim = dim assert self.dim > 0 self.num_pts = num_pts - self.r = np.random.randint(0,100) - self.points = jax.random.normal(key*self.r, shape=(dim,num_pts)) - self.d = 1/(dim) + self.r = np.random.randint(0, 100) + self.points = jax.random.normal(key * self.r, shape=(dim, num_pts)) + self.d = 1 / (dim) self.cutoff = 0.8 * fm self.nd = [] - + def build_neutron(self) -> Tuple[np.array]: for i in range(self.num_pts): v = self.points[:, i] length = jnp.linalg.norm(v) if length >= self.cutoff: r = np.random.rand() ** self.d - v = jnp.multiply(v,r*self.cutoff) + v = jnp.multiply(v, r * self.cutoff) self.nd.append(v) self.nd = np.unique(self.nd, axis=0) assert len(self.nd) == self.num_pts return tuple([self.nd]) -if __name__ == '__main__': +if __name__ == "__main__": print(fm) e = Electron(0).build_electron() print(e) n = Neutron(3, 5).build_neutron() print(n) p = Proton(3, 10).build_proton() - print(p) \ No newline at end of file + print(p) diff --git a/src/core_utils.py b/src/core_utils.py index ff83340..ccadc33 100644 --- a/src/core_utils.py +++ b/src/core_utils.py @@ -5,6 +5,7 @@ from typing import Tuple from collections import defaultdict + class GluingMap: def __init__(self, boundary1, boundary2, target): self.b1 = boundary1 @@ -13,7 +14,7 @@ def __init__(self, boundary1, boundary2, target): self.checker() self.construct_map() self.map = self.construct_map() - + def checker(self) -> None: assert type(self.b1) is np.ndarray assert type(self.b2) is np.ndarray @@ -24,7 +25,7 @@ def checker(self) -> None: assert len(self.target.shape) > 0 and len(self.target) >= 0 assert self.target.shape[0] >= 0 - def cartesian_product_recursive(self,*arrays, out=None): + def cartesian_product_recursive(self, *arrays, out=None): arrays = [np.asarray(x) for x in arrays] dtype = arrays[0].dtype @@ -33,13 +34,13 @@ def cartesian_product_recursive(self,*arrays, out=None): out = np.zeros([n, len(arrays)], dtype=dtype) m = n // arrays[0].size - out[:,0] = np.repeat(arrays[0], m) + out[:, 0] = np.repeat(arrays[0], m) if arrays[1:]: - self.cartesian_product_recursive(arrays[1:], out=out[0:m,1:]) + self.cartesian_product_recursive(arrays[1:], out=out[0:m, 1:]) for j in range(1, arrays[0].size): - out[j*m:(j+1)*m,1:] = out[0:m,1:] + out[j * m : (j + 1) * m, 1:] = out[0:m, 1:] return out - + def construct_map(self): d = defaultdict(np.ndarray) product = self.cartesian_product_recursive(self.b1, self.b2) @@ -49,17 +50,18 @@ def construct_map(self): d[key] = value return d + class MiscUtilFunctions: def __init__(self, valid=True): self.valid = valid self.num_fns = 1 - self.fns = ['extract_defaultdict'] - self.version = '1.0' - - def extract_defaultdict(self, str_data:str, dtype:str='str') -> defaultdict: + self.fns = ["extract_defaultdict"] + self.version = "1.0" + + def extract_defaultdict(self, str_data: str, dtype: str = "str") -> defaultdict: p = re.compile(r"^defaultdict\(") c = p.findall(str_data)[0] - defdict = eval(str_data.replace(""% c, dtype)) + defdict = eval(str_data.replace("" % c, dtype)) assert type(defdict) is defaultdict assert len(defdict) > 0 key_test = list(defdict.keys())[0] @@ -71,29 +73,29 @@ class ElectronField: def __init__(self): self.efield = np.array([]) self.waves = np.array([]) - + def get_efield(self) -> np.ndarray: return self.efield - + def get_waves(self) -> np.ndarray: return self.waves - - def add_electrons(self, electrons:np.ndarray, waves:np.ndarray): + + def add_electrons(self, electrons: np.ndarray, waves: np.ndarray): if len(waves) > len(electrons): - waves = waves[:len(electrons)] + waves = waves[: len(electrons)] elif len(waves) < len(electrons): - waves = waves + waves[:len(electrons)-len(waves)] + waves = waves + waves[: len(electrons) - len(waves)] assert len(waves) == len(electrons) assert type(electrons) is np.ndarray and type(waves) is np.ndarray self.efield = np.append(self.efield, electrons) self.waves = np.append(self.waves, waves) assert len(self.efield) == len(self.waves) - - def get_index(self, i:int,j:int) -> int: + + def get_index(self, i: int, j: int) -> int: assert type(i) is int and type(j) is int and i > 0 and j > 0 - index = (i+j)%len(self.efield) + index = (i + j) % len(self.efield) return index - def get_ij_wave_electron(self, i:int, j:int) -> Tuple[np.ndarray, np.ndarray]: - index = self.get_index(i,j) - return tuple([self.efield[index], self.waves[index]]) \ No newline at end of file + def get_ij_wave_electron(self, i: int, j: int) -> Tuple[np.ndarray, np.ndarray]: + index = self.get_index(i, j) + return tuple([self.efield[index], self.waves[index]]) diff --git a/src/general_utils.py b/src/general_utils.py index 61e850c..ad85e66 100644 --- a/src/general_utils.py +++ b/src/general_utils.py @@ -7,60 +7,68 @@ from microstructpy.geometry.n_sphere import NSphere from core_utils import GluingMap + class GeneralComplexUtils: def __init__(self, cutoff): self.cutoff = cutoff + def nuclear_force_map(self, boundary_pq, boundary_y, target) -> list: G = GluingMap(boundary_pq, boundary_y, target) return G.map - - def topological_disjoint_union(self, gluing_map:defaultdict, skeleton:np.ndarray, particle:np.ndarray): + + def topological_disjoint_union( + self, gluing_map: defaultdict, skeleton: np.ndarray, particle: np.ndarray + ): for k in gluing_map.keys(): k = np.asarray(k) skeleton = np.concatenate([skeleton, k]) p = particle.ravel() K = np.concatenate([skeleton, p]) return K - - def union(self,X:np.ndarray,Y:np.ndarray) -> np.ndarray: - return np.concatenate((X,Y), axis=0) - - def random_element_pop(self,L:List[List[int]]) -> list: - i = random.randrange(len(L)) # get random index - L[i], L[-1] = L[-1], L[i] # swap with the last element + + def union(self, X: np.ndarray, Y: np.ndarray) -> np.ndarray: + return np.concatenate((X, Y), axis=0) + + def random_element_pop(self, L: List[List[int]]) -> list: + i = random.randrange(len(L)) # get random index + L[i], L[-1] = L[-1], L[i] # swap with the last element x = L.pop() return x - - def get_nsphere_and_limits(self, radius:np.float32, center:np.ndarray) -> Tuple[list, list]: + + def get_nsphere_and_limits( + self, radius: np.float32, center: np.ndarray + ) -> Tuple[list, list]: representation = NSphere(r=radius, center=center) nsphere = representation.approximate() boundary = representation.limits return Tuple[nsphere, boundary] - def centered_2_norm(self, X:np.array, center:np.array) -> np.float32: - return np.linalg.norm(X-center) + def centered_2_norm(self, X: np.array, center: np.array) -> np.float32: + return np.linalg.norm(X - center) - def get_nsphere_and_sampled_boundary(self, radius:np.float32, center:np.ndarray, sample:int, eps:np.float32) -> [list, np.ndarray]: + def get_nsphere_and_sampled_boundary( + self, radius: np.float32, center: np.ndarray, sample: int, eps: np.float32 + ) -> [list, np.ndarray]: representation = NSphere(r=radius, center=center) nsphere = representation.approximate() dim = center.shape[0] boundary = set() - def generate_normal(sample:int,dim:int) -> jnp.array: - state = np.random.randint(0,10*5) + def generate_normal(sample: int, dim: int) -> jnp.array: + state = np.random.randint(0, 10 * 5) key = jax.random.PRNGKey(state) _, subkey = jax.random.split(key) X = jax.random.normal(subkey, shape=(sample, dim)) return X - + @jax.jit - def normalize_vector(v:jnp.array): - return v/jnp.linalg.norm(v) - + def normalize_vector(v: jnp.array): + return v / jnp.linalg.norm(v) + @jax.jit - def compute_norm(mat:np.ndarray) -> jnp.float32: + def compute_norm(mat: np.ndarray) -> jnp.float32: return jnp.linalg.norm(mat) - + normalize = jax.jit(normalize_vector) two_norm = jax.jit(compute_norm) numit = 0 @@ -76,45 +84,73 @@ def compute_norm(mat:np.ndarray) -> jnp.float32: boundary.append(l) rem = dim - len(boundary) for _ in range(rem): - eps = np.random.randint(0,10*2) * 1e-19 - if type(boundary[-1]) is float or type(boundary[-1]) is np.float64 or type(boundary[-1]) is np.float32: + eps = np.random.randint(0, 10 * 2) * 1e-19 + if ( + type(boundary[-1]) is float + or type(boundary[-1]) is np.float64 + or type(boundary[-1]) is np.float32 + ): boundary[-1] = np.array([boundary[-1]]) - arr = np.asarray([i+eps for i in boundary[-1]]) + arr = np.asarray([i + eps for i in boundary[-1]]) boundary.append(arr) else: for _ in range(remain): - epsilon = np.random.randint(0,10*2) * 1e-19 - if type(boundary[-1]) is float or type(boundary[-1]) is np.float64 or type(boundary[-1]) is np.float32: + epsilon = np.random.randint(0, 10 * 2) * 1e-19 + if ( + type(boundary[-1]) is float + or type(boundary[-1]) is np.float64 + or type(boundary[-1]) is np.float32 + ): boundary[-1] = np.array([boundary[-1]]) - arr = np.asarray([i+epsilon for i in boundary[-1]]) + arr = np.asarray([i + epsilon for i in boundary[-1]]) boundary.append(arr) break - X = generate_normal(sample,dim) + X = generate_normal(sample, dim) for i in range(sample): rand_vector = normalize(X[i, :]) rand_vector = np.asarray(rand_vector) rand_vector = rand_vector.tolist() for i in range(len(rand_vector)): rand_vector[i] = rand_vector[i] * radius - norm = two_norm(rand_vector-center[0]) - if np.abs(norm - radius) < eps and len(boundary) < dim and type(boundary) is set: + norm = two_norm(rand_vector - center[0]) + if ( + np.abs(norm - radius) < eps + and len(boundary) < dim + and type(boundary) is set + ): rand_vector = tuple(rand_vector) boundary.add(rand_vector) elif len(boundary) >= dim: break - elif type(boundary) is list and np.abs(norm - radius) < eps and len(boundary) < dim: + elif ( + type(boundary) is list + and np.abs(norm - radius) < eps + and len(boundary) < dim + ): boundary.append(rand_vector) numit += 1 boundary = list(boundary) for i, it in enumerate(boundary): boundary[i] = np.asarray(it) dim_check = list([boundary[0].size]) - if dim_check[0] < dim or type(boundary[0]) is not np.ndarray or ((type(boundary) is np.ndarray or type(boundary) is list) and type(boundary[0]) is np.float64): + if ( + dim_check[0] < dim + or type(boundary[0]) is not np.ndarray + or ( + (type(boundary) is np.ndarray or type(boundary) is list) + and type(boundary[0]) is np.float64 + ) + ): for i, b in enumerate(boundary): if type(b) is not list and type(b) is not np.ndarray: boundary[i] = np.asarray([b for _ in range(dim)]).ravel() else: boundary[i] = np.asarray([b.ravel() for _ in range(dim)]).ravel() boundary = np.asarray(boundary) - assert len(boundary) == dim and len(boundary[0]) == dim and type(boundary) is np.ndarray and boundary.shape == (dim, dim) - return nsphere, boundary \ No newline at end of file + assert ( + len(boundary) == dim + and len(boundary[0]) == dim + and type(boundary) is np.ndarray + and boundary.shape == (dim, dim) + ) + return nsphere, boundary diff --git a/src/polyatomic_complex.py b/src/polyatomic_complex.py index 6d19f24..1b213d0 100644 --- a/src/polyatomic_complex.py +++ b/src/polyatomic_complex.py @@ -7,9 +7,17 @@ from core_utils import GluingMap, ElectronField from building_blocks import Electron + class PolyAtomComplex: - def __init__(self, atom_list, using_radial=False, using_force=False, update_forces=None, update_radial=None): + def __init__( + self, + atom_list, + using_radial=False, + using_force=False, + update_forces=None, + update_radial=None, + ): assert isinstance(atom_list, list) and len(atom_list) > 0 assert isinstance(using_radial, bool) assert isinstance(using_force, bool) @@ -17,33 +25,33 @@ def __init__(self, atom_list, using_radial=False, using_force=False, update_forc self.using_radial = using_radial self.using_force = using_force self.cwd = os.getcwd() - self.datapath = self.cwd + '/dataset/construct' - assert 'atom_lookup.pkl' in os.listdir(self.datapath) - assert 'lookup_map.json' in os.listdir(self.datapath) - self.reference = self.datapath + '/lookup_map.json' - with open(self.reference, 'rb') as f: + self.datapath = self.cwd + "/dataset/construct" + assert "atom_lookup.pkl" in os.listdir(self.datapath) + assert "lookup_map.json" in os.listdir(self.datapath) + self.reference = self.datapath + "/lookup_map.json" + with open(self.reference, "rb") as f: self.lookup_map = json.load(f) assert isinstance(self.lookup_map, dict) assert isinstance(using_radial, bool) assert isinstance(using_force, bool) if using_radial: - assert hasattr(update_radial, '__call__') + assert hasattr(update_radial, "__call__") self.update_radial = update_radial if using_force: - assert hasattr(update_forces, '__call__') + assert hasattr(update_forces, "__call__") self.update_forces = update_forces - + def gen_elec_info(self, k) -> List: self.AE = [] self.d_e = 0 - if isinstance(self.d_e,int): - electrons = Electron(self.d_e, k).build_electron() + if isinstance(self.d_e, int): + electrons = Electron(self.d_e, k).build_electron() self.AE = list(electrons) return self.AE - + def fast_build_complex(self) -> Tuple: - lookup = self.datapath+'/atom_lookup.pkl' - with open(lookup, 'rb') as f: + lookup = self.datapath + "/atom_lookup.pkl" + with open(lookup, "rb") as f: lookup = dill.load(f) C = [] for a in self.atoms: @@ -51,10 +59,10 @@ def fast_build_complex(self) -> Tuple: C += list(acomplex[0].ravel()) C = np.asarray(C) return tuple([C, [], [], []]) - + def general_build_complex(self): - lookup = self.datapath+'/atom_lookup.pkl' - with open(lookup, 'rb') as f: + lookup = self.datapath + "/atom_lookup.pkl" + with open(lookup, "rb") as f: lookup = dill.load(f) eleminfo = self.lookup_map A = [] @@ -68,29 +76,31 @@ def general_build_complex(self): self.F = np.array([]) self.DE = np.array([]) - def disjont_union(gluing_map:np.ndarray, skeleton:np.ndarray, particle:np.ndarray) -> np.ndarray: + def disjont_union( + gluing_map: np.ndarray, skeleton: np.ndarray, particle: np.ndarray + ) -> np.ndarray: skeleton = np.concatenate([skeleton.ravel(), gluing_map.ravel()]) K = np.concatenate([skeleton, particle.ravel()]) K = np.unique(K) return K - def electron_union(EF_1:tuple, EF_2:tuple) -> None: + def electron_union(EF_1: tuple, EF_2: tuple) -> None: ee0_1 = EF_1[0] wv_1 = EF_1[1] - assert isinstance(ee0_1,np.ndarray) - assert isinstance(wv_1,np.ndarray) + assert isinstance(ee0_1, np.ndarray) + assert isinstance(wv_1, np.ndarray) e_field = np.union1d(ee0_1, []) wves = np.concatenate([wv_1.ravel()]) e_field, waves = e_field.ravel(), wves.ravel() self.E.add_electrons(e_field, waves) return None - - def dir_sum(a:np.ndarray,b:np.ndarray) -> np.ndarray: - dsum = np.zeros(np.add(a.shape,b.shape)) - dsum[:a.shape[0],:a.shape[1]]=a - dsum[a.shape[0]:,a.shape[1]:]=b + + def dir_sum(a: np.ndarray, b: np.ndarray) -> np.ndarray: + dsum = np.zeros(np.add(a.shape, b.shape)) + dsum[: a.shape[0], : a.shape[1]] = a + dsum[a.shape[0] :, a.shape[1] :] = b return dsum - + while A: I = A.pop(0) num_elec, k, _, df, de = I @@ -108,30 +118,94 @@ def dir_sum(a:np.ndarray,b:np.ndarray) -> np.ndarray: if self.using_radial: self.DE = dir_sum(self.DE, de) self.update_radial(self.DE) - + return tuple([C, self.E.get_efield(), self.F, self.DE]) + def sanity_test(atom_list, kind): pac = PolyAtomComplex(atom_list) - if kind == 'general': + if kind == "general": try: pac.general_build_complex() - print(f'success ✅') + print(f"success ✅") except: - print('failed ❌') + print("failed ❌") else: try: pac.fast_build_complex() - print(f'success ✅') + print(f"success ✅") except: - print('failed ❌') - + print("failed ❌") -if __name__ == '__main__': - sanity_test(['H', 'H', 'O'], '') - sanity_test(['C', 'H', 'H', 'H'], '') - sanity_test(['Np','U', 'P', 'P', 'P', 'P', 'H', 'H', 'H', 'H', 'C', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'C', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O'], '') - sanity_test(['H', 'H', 'O'], 'general') - sanity_test(['C', 'H', 'H', 'H'], 'general') - sanity_test(['Np','U', 'P', 'P', 'P', 'P', 'H', 'H', 'H', 'H', 'C', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'C', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O'], 'general') +if __name__ == "__main__": + sanity_test(["H", "H", "O"], "") + sanity_test(["C", "H", "H", "H"], "") + sanity_test( + [ + "Np", + "U", + "P", + "P", + "P", + "P", + "H", + "H", + "H", + "H", + "C", + "O", + "O", + "O", + "O", + "O", + "O", + "O", + "O", + "C", + "O", + "O", + "O", + "O", + "O", + "O", + "O", + "O", + ], + "", + ) + sanity_test(["H", "H", "O"], "general") + sanity_test(["C", "H", "H", "H"], "general") + sanity_test( + [ + "Np", + "U", + "P", + "P", + "P", + "P", + "H", + "H", + "H", + "H", + "C", + "O", + "O", + "O", + "O", + "O", + "O", + "O", + "O", + "C", + "O", + "O", + "O", + "O", + "O", + "O", + "O", + "O", + ], + "general", + ) diff --git a/src/process_esol.py b/src/process_esol.py index fa3f6da..9f9fdf3 100644 --- a/src/process_esol.py +++ b/src/process_esol.py @@ -6,63 +6,74 @@ from collections import defaultdict from polyatomic_complex import PolyAtomComplex + class ProcessESOL: def __init__(self): - self.src = os.getcwd()+'/dataset/esol/' - assert 'ESOL.csv' in os.listdir(self.src) - self.datapath = self.src + 'ESOL.csv' + self.src = os.getcwd() + "/dataset/esol/" + assert "ESOL.csv" in os.listdir(self.src) + self.datapath = self.src + "ESOL.csv" self.data = pd.read_csv(self.datapath) assert isinstance(self.data, pd.DataFrame) - + def process(self) -> None: representations = defaultdict(tuple) - for i, row in enumerate(self.data['smiles']): - print(f'row {row}') - print(f'tpe {type(row)}') + for i, row in enumerate(self.data["smiles"]): + print(f"row {row}") + print(f"tpe {type(row)}") atoms = self.smiles_to_atoms(row) representations[i] = PolyAtomComplex(atom_list=atoms).fast_build_complex() - - with open(self.src+'fast_complex_lookup_repn.pkl', 'wb') as f: + + with open(self.src + "fast_complex_lookup_repn.pkl", "wb") as f: dill.dump(representations, f) return None - + def process_deep_complexes(self) -> None: representations = defaultdict(tuple) - for i, row in enumerate(self.data['smiles']): - print(f'row {row}') - print(f'tpe {type(row)}') + for i, row in enumerate(self.data["smiles"]): + print(f"row {row}") + print(f"tpe {type(row)}") atoms = self.smiles_to_atoms(row) - representations[i] = PolyAtomComplex(atom_list=atoms).general_build_complex() - - with open(self.src+'deep_complex_lookup_repn.pkl', 'wb') as f: + representations[i] = PolyAtomComplex( + atom_list=atoms + ).general_build_complex() + + with open(self.src + "deep_complex_lookup_repn.pkl", "wb") as f: dill.dump(representations, f) return None - def smiles_to_atoms(self, smile:str) -> list: + def smiles_to_atoms(self, smile: str) -> list: assert isinstance(smile, str) mol = Chem.MolFromSmiles(smile) Chem.Kekulize(mol) atom_counts = {} for atom in mol.GetAtoms(): - neighbors = [(neighbor.GetSymbol(), bond.GetBondType()) - for neighbor, bond in zip(atom.GetNeighbors(), atom.GetBonds())] + neighbors = [ + (neighbor.GetSymbol(), bond.GetBondType()) + for neighbor, bond in zip(atom.GetNeighbors(), atom.GetBonds()) + ] neighbors.sort() - key = "{}-{}".format(atom.GetSymbol(), "".join(f"{symbol}{'-' if bond_order == 1 else '=' if bond_order == 2 else '#'}" for symbol, bond_order in neighbors)) + key = "{}-{}".format( + atom.GetSymbol(), + "".join( + f"{symbol}{'-' if bond_order == 1 else '=' if bond_order == 2 else '#'}" + for symbol, bond_order in neighbors + ), + ) atom_counts[key] = atom_counts.get(key, 0) + 1 - regex = re.compile('[^a-zA-Z]') + regex = re.compile("[^a-zA-Z]") atoms_list = [] for k in atom_counts: - cleaned = regex.sub(' ', k).split(' ') + cleaned = regex.sub(" ", k).split(" ") res = [] for l in cleaned: r = l.strip() - if r != '' and ' ' not in r: + if r != "" and " " not in r: res.append(r) atoms_list += res return atoms_list -if __name__ == '__main__': +if __name__ == "__main__": prc = ProcessESOL() - #prc.process() + # prc.process() prc.process_deep_complexes() diff --git a/src/process_freesolv.py b/src/process_freesolv.py index 38fcfda..002b01d 100644 --- a/src/process_freesolv.py +++ b/src/process_freesolv.py @@ -6,63 +6,74 @@ from collections import defaultdict from polyatomic_complex import PolyAtomComplex + class ProcessFreeSolv: def __init__(self): - self.src = os.getcwd()+'/dataset/free_solv/' - assert 'FreeSolv.csv' in os.listdir(self.src) - self.datapath = self.src + 'FreeSOLV.csv' + self.src = os.getcwd() + "/dataset/free_solv/" + assert "FreeSolv.csv" in os.listdir(self.src) + self.datapath = self.src + "FreeSOLV.csv" self.data = pd.read_csv(self.datapath) assert isinstance(self.data, pd.DataFrame) - + def process(self) -> None: representations = defaultdict(tuple) - for i, row in enumerate(self.data['smiles']): - print(f'row {row}') - print(f'tpe {type(row)}') + for i, row in enumerate(self.data["smiles"]): + print(f"row {row}") + print(f"tpe {type(row)}") atoms = self.smiles_to_atoms(row) representations[i] = PolyAtomComplex(atom_list=atoms).fast_build_complex() - - with open(self.src+'fast_complex_lookup_repn.pkl', 'wb') as f: + + with open(self.src + "fast_complex_lookup_repn.pkl", "wb") as f: dill.dump(representations, f) return None - + def process_deep_complexes(self) -> None: representations = defaultdict(tuple) - for i, row in enumerate(self.data['smiles']): - print(f'row {row}') - print(f'tpe {type(row)}') + for i, row in enumerate(self.data["smiles"]): + print(f"row {row}") + print(f"tpe {type(row)}") atoms = self.smiles_to_atoms(row) - representations[i] = PolyAtomComplex(atom_list=atoms).general_build_complex() - - with open(self.src+'deep_complex_lookup_repn.pkl', 'wb') as f: + representations[i] = PolyAtomComplex( + atom_list=atoms + ).general_build_complex() + + with open(self.src + "deep_complex_lookup_repn.pkl", "wb") as f: dill.dump(representations, f) return None - def smiles_to_atoms(self, smile:str) -> list: + def smiles_to_atoms(self, smile: str) -> list: assert isinstance(smile, str) mol = Chem.MolFromSmiles(smile) Chem.Kekulize(mol) atom_counts = {} for atom in mol.GetAtoms(): - neighbors = [(neighbor.GetSymbol(), bond.GetBondType()) - for neighbor, bond in zip(atom.GetNeighbors(), atom.GetBonds())] + neighbors = [ + (neighbor.GetSymbol(), bond.GetBondType()) + for neighbor, bond in zip(atom.GetNeighbors(), atom.GetBonds()) + ] neighbors.sort() - key = "{}-{}".format(atom.GetSymbol(), "".join(f"{symbol}{'-' if bond_order == 1 else '=' if bond_order == 2 else '#'}" for symbol, bond_order in neighbors)) + key = "{}-{}".format( + atom.GetSymbol(), + "".join( + f"{symbol}{'-' if bond_order == 1 else '=' if bond_order == 2 else '#'}" + for symbol, bond_order in neighbors + ), + ) atom_counts[key] = atom_counts.get(key, 0) + 1 - regex = re.compile('[^a-zA-Z]') + regex = re.compile("[^a-zA-Z]") atoms_list = [] for k in atom_counts: - cleaned = regex.sub(' ', k).split(' ') + cleaned = regex.sub(" ", k).split(" ") res = [] for l in cleaned: r = l.strip() - if r != '' and ' ' not in r: + if r != "" and " " not in r: res.append(r) atoms_list += res return atoms_list -if __name__ == '__main__': +if __name__ == "__main__": prc = ProcessFreeSolv() - #prc.process() + # prc.process() prc.process_deep_complexes() diff --git a/src/process_lipophilicity.py b/src/process_lipophilicity.py index 437a7ba..433eb0d 100644 --- a/src/process_lipophilicity.py +++ b/src/process_lipophilicity.py @@ -6,63 +6,74 @@ from collections import defaultdict from polyatomic_complex import PolyAtomComplex + class ProcessLipophilicity: def __init__(self): - self.src = os.getcwd()+'/dataset/lipophilicity/' - assert 'Lipophilicity.csv' in os.listdir(self.src) - self.datapath = self.src + 'Lipophilicity.csv' + self.src = os.getcwd() + "/dataset/lipophilicity/" + assert "Lipophilicity.csv" in os.listdir(self.src) + self.datapath = self.src + "Lipophilicity.csv" self.data = pd.read_csv(self.datapath) assert isinstance(self.data, pd.DataFrame) - + def process(self) -> None: representations = defaultdict(tuple) - for i, row in enumerate(self.data['smiles']): - print(f'row {row}') - print(f'tpe {type(row)}') + for i, row in enumerate(self.data["smiles"]): + print(f"row {row}") + print(f"tpe {type(row)}") atoms = self.smiles_to_atoms(row) representations[i] = PolyAtomComplex(atom_list=atoms).fast_build_complex() - - with open(self.src+'fast_complex_lookup_repn.pkl', 'wb') as f: + + with open(self.src + "fast_complex_lookup_repn.pkl", "wb") as f: dill.dump(representations, f) return None - + def process_deep_complexes(self) -> None: representations = defaultdict(tuple) - for i, row in enumerate(self.data['smiles']): - print(f'row {row}') - print(f'tpe {type(row)}') + for i, row in enumerate(self.data["smiles"]): + print(f"row {row}") + print(f"tpe {type(row)}") atoms = self.smiles_to_atoms(row) - representations[i] = PolyAtomComplex(atom_list=atoms).general_build_complex() - - with open(self.src+'deep_complex_lookup_repn.pkl', 'wb') as f: + representations[i] = PolyAtomComplex( + atom_list=atoms + ).general_build_complex() + + with open(self.src + "deep_complex_lookup_repn.pkl", "wb") as f: dill.dump(representations, f) return None - def smiles_to_atoms(self, smile:str) -> list: + def smiles_to_atoms(self, smile: str) -> list: assert isinstance(smile, str) mol = Chem.MolFromSmiles(smile) Chem.Kekulize(mol) atom_counts = {} for atom in mol.GetAtoms(): - neighbors = [(neighbor.GetSymbol(), bond.GetBondType()) - for neighbor, bond in zip(atom.GetNeighbors(), atom.GetBonds())] + neighbors = [ + (neighbor.GetSymbol(), bond.GetBondType()) + for neighbor, bond in zip(atom.GetNeighbors(), atom.GetBonds()) + ] neighbors.sort() - key = "{}-{}".format(atom.GetSymbol(), "".join(f"{symbol}{'-' if bond_order == 1 else '=' if bond_order == 2 else '#'}" for symbol, bond_order in neighbors)) + key = "{}-{}".format( + atom.GetSymbol(), + "".join( + f"{symbol}{'-' if bond_order == 1 else '=' if bond_order == 2 else '#'}" + for symbol, bond_order in neighbors + ), + ) atom_counts[key] = atom_counts.get(key, 0) + 1 - regex = re.compile('[^a-zA-Z]') + regex = re.compile("[^a-zA-Z]") atoms_list = [] for k in atom_counts: - cleaned = regex.sub(' ', k).split(' ') + cleaned = regex.sub(" ", k).split(" ") res = [] for l in cleaned: r = l.strip() - if r != '' and ' ' not in r: + if r != "" and " " not in r: res.append(r) atoms_list += res return atoms_list -if __name__ == '__main__': +if __name__ == "__main__": prc = ProcessLipophilicity() - #prc.process() + # prc.process() prc.process_deep_complexes() diff --git a/src/process_materials_project.py b/src/process_materials_project.py index a448db0..5b82d59 100644 --- a/src/process_materials_project.py +++ b/src/process_materials_project.py @@ -7,20 +7,21 @@ from collections import defaultdict from polyatomic_complex import PolyAtomComplex + class ProcessMP: def __init__(self): - self.src = os.getcwd()+'/dataset/materials_project/' - assert 'materials_data.csv' in os.listdir(self.src) - self.datapath = self.src + 'materials_data.csv' + self.src = os.getcwd() + "/dataset/materials_project/" + assert "materials_data.csv" in os.listdir(self.src) + self.datapath = self.src + "materials_data.csv" self.data = pd.read_csv(self.datapath, low_memory=False) assert isinstance(self.data, pd.DataFrame) - + def process(self) -> None: representations = defaultdict(tuple) - for i, data in enumerate(zip(self.data['elements'], self.data['composition'])): + for i, data in enumerate(zip(self.data["elements"], self.data["composition"])): elem, comp = data elem = eval(elem) - print(f'comp {comp}') + print(f"comp {comp}") try: comp = json.loads(comp) atoms = self.extract_atoms(elem, comp) @@ -30,16 +31,16 @@ def process(self) -> None: atoms = comp representations[i] = PolyAtomComplex(atom_list=atoms).fast_build_complex() assert len(representations) == len(self.data) - with open(self.src+'fast_complex_lookup_repn.pkl', 'wb') as f: + with open(self.src + "fast_complex_lookup_repn.pkl", "wb") as f: dill.dump(representations, f) return None - + def process_deep_complexes(self) -> None: representations = defaultdict(tuple) def helper(data): - i,row = data - elem,comp = row + i, row = data + elem, comp = row elem = eval(elem) try: comp = json.loads(comp) @@ -51,26 +52,31 @@ def helper(data): pc = PolyAtomComplex(atom_list=atoms) repn = pc.general_build_complex() representations[i] = repn - print('done') + print("done") return repn - + with Pool() as p: - p.map(func=helper, iterable=list(enumerate(zip(self.data['elements'], self.data['composition'])))) + p.map( + func=helper, + iterable=list( + enumerate(zip(self.data["elements"], self.data["composition"])) + ), + ) assert len(representations) == len(self.data) - with open(self.src+'deep_complex_lookup_repn.pkl', 'wb') as f: + with open(self.src + "deep_complex_lookup_repn.pkl", "wb") as f: dill.dump(representations, f) return None - def extract_atoms(self, element, composition) -> List: atom_list = [] assert isinstance(composition, dict) and isinstance(element, list) for k in element: - atom_list += [k]*int(composition[k]) + atom_list += [k] * int(composition[k]) return atom_list -if __name__ == '__main__': + +if __name__ == "__main__": prc = ProcessMP() - #prc.process() + # prc.process() prc.process_deep_complexes() diff --git a/src/process_photoswitches.py b/src/process_photoswitches.py index 4ad07b3..589a7cc 100644 --- a/src/process_photoswitches.py +++ b/src/process_photoswitches.py @@ -6,63 +6,74 @@ from collections import defaultdict from polyatomic_complex import PolyAtomComplex + class ProcessPhotoswitches: def __init__(self): - self.src = os.getcwd()+'/dataset/photoswitches/' - assert 'photoswitches.csv' in os.listdir(self.src) - self.datapath = self.src + 'photoswitches.csv' + self.src = os.getcwd() + "/dataset/photoswitches/" + assert "photoswitches.csv" in os.listdir(self.src) + self.datapath = self.src + "photoswitches.csv" self.data = pd.read_csv(self.datapath) assert isinstance(self.data, pd.DataFrame) - + def process(self) -> None: representations = defaultdict(tuple) - for i, row in enumerate(self.data['SMILES']): - print(f'row {row}') - print(f'tpe {type(row)}') + for i, row in enumerate(self.data["SMILES"]): + print(f"row {row}") + print(f"tpe {type(row)}") atoms = self.smiles_to_atoms(row) representations[i] = PolyAtomComplex(atom_list=atoms).fast_build_complex() - - with open(self.src+'fast_complex_lookup_repn.pkl', 'wb') as f: + + with open(self.src + "fast_complex_lookup_repn.pkl", "wb") as f: dill.dump(representations, f) return None - + def process_deep_complexes(self) -> None: representations = defaultdict(tuple) - for i, row in enumerate(self.data['SMILES']): - print(f'row {row}') - print(f'tpe {type(row)}') + for i, row in enumerate(self.data["SMILES"]): + print(f"row {row}") + print(f"tpe {type(row)}") atoms = self.smiles_to_atoms(row) - representations[i] = PolyAtomComplex(atom_list=atoms).general_build_complex() - - with open(self.src+'deep_complex_lookup_repn.pkl', 'wb') as f: + representations[i] = PolyAtomComplex( + atom_list=atoms + ).general_build_complex() + + with open(self.src + "deep_complex_lookup_repn.pkl", "wb") as f: dill.dump(representations, f) return None - def smiles_to_atoms(self, smile:str) -> list: + def smiles_to_atoms(self, smile: str) -> list: assert isinstance(smile, str) mol = Chem.MolFromSmiles(smile) Chem.Kekulize(mol) atom_counts = {} for atom in mol.GetAtoms(): - neighbors = [(neighbor.GetSymbol(), bond.GetBondType()) - for neighbor, bond in zip(atom.GetNeighbors(), atom.GetBonds())] + neighbors = [ + (neighbor.GetSymbol(), bond.GetBondType()) + for neighbor, bond in zip(atom.GetNeighbors(), atom.GetBonds()) + ] neighbors.sort() - key = "{}-{}".format(atom.GetSymbol(), "".join(f"{symbol}{'-' if bond_order == 1 else '=' if bond_order == 2 else '#'}" for symbol, bond_order in neighbors)) + key = "{}-{}".format( + atom.GetSymbol(), + "".join( + f"{symbol}{'-' if bond_order == 1 else '=' if bond_order == 2 else '#'}" + for symbol, bond_order in neighbors + ), + ) atom_counts[key] = atom_counts.get(key, 0) + 1 - regex = re.compile('[^a-zA-Z]') + regex = re.compile("[^a-zA-Z]") atoms_list = [] for k in atom_counts: - cleaned = regex.sub(' ', k).split(' ') + cleaned = regex.sub(" ", k).split(" ") res = [] for l in cleaned: r = l.strip() - if r != '' and ' ' not in r: + if r != "" and " " not in r: res.append(r) atoms_list += res return atoms_list -if __name__ == '__main__': +if __name__ == "__main__": prc = ProcessPhotoswitches() - #prc.process() + # prc.process() prc.process_deep_complexes()