Content-Length: 39177 | pFad | http://github.com/postgresml/postgresml/pull/1.diff

thub.com diff --git a/README.md b/README.md index 1883fb017..46b7cc760 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,82 @@ -## Postgres ML demo +## PostgresML + +PostgresML aims to be the easiest way to gain value from machine learning. Anyone with a basic understanding of SQL should be able to build and deploy models to production, while receiving the benefits of a high performance machine learning platform. PostgresML leverages state of the art algorithms with built in best practices, without having to setup additional infrastructure or learn additional programming languages. + +Getting started is as easy as creating a `table` or `view` that holds the training data, and then registering that with PostgresML. + +```sql +SELECT pgml.model_regression('Red Wine Quality', training_data_table_or_view_name, label_column_name); +``` + +And predict novel datapoints: + +```sql +SELECT pgml.predict('Red Wine Quality', red_wines.*) +FROM pgml.red_wines +LIMIT 3; + + quality +--------- + 0.896432 + 0.834822 + 0.954502 +(3 rows) +``` + +PostgresML similarly supports classification to predict discrete classes rather than numeric scores for novel data. + +```sql +SELECT pgml.create_classification('Handwritten Digit Classifier', pgml.mnist_training_data, label_column_name); +``` + +And predict novel datapoints: + +```sql +SELECT pgml.predict('Handwritten Digit Classifier', pgml.mnist_test_data.*) +FROM pgml.mnist +LIMIT 1; + + digit | likelihood +-------+---- + 5 | 0.956432 +(1 row) +``` + +Checkout the [documentation](https://TODO) to view the full capabilities, including: +- [Creating Training Sets](https://TODO) + - [Classification](https://TODO) + - [Regression](https://TODO) +- [Supported Algorithms](https://TODO) + - [Scikit Learn](https://TODO) + - [XGBoost](https://TODO) + - [Tensorflow](https://TODO) + - [PyTorch](https://TODO) + +### Planned features +- Model management dashboard +- Data explorer +- More algorithms and libraries incluiding custom algorithm support + + +### FAQ + +*How well does this scale?* + +Petabyte sized Postgres deployements are [documented](https://www.computerworld.com/article/2535825/size-matters--yahoo-claims-2-petabyte-database-is-world-s-biggest--busiest.html) in production since at least 2008, and [recent patches](https://www.2ndquadrant.com/en/blog/postgresql-maximum-table-size/) have enabled working beyond exabyte up to the yotabyte scale. Machine learning models can be horizontally scaled using well tested Postgres replication techniques on top of a mature storage and compute platform. + +*How reliable is this system?* + +Postgres is widely considered mission critical, and some of the most [reliable](https://www.postgresql.org/docs/current/wal-reliability.html) technology in any modern stack. PostgresML allows an infrastructure organization to leverage pre-existing best practices to deploy machine learning into production with less risk and effort than other systems. For example, model backup and recovery happens automatically alongside normal data backup procedures. + +*How good are the models?* + +Model quality is often a tradeoff between compute resources and incremental quality improvements. PostgresML allows stakeholders to choose algorithms from several libraries that will provide the most bang for the buck. In addition, PostgresML automatically applies best practices for data cleaning like imputing missing values by default and normalizing data to prevent common problems in production. After quickly enabling 0 to 1 value creation, PostgresML enables further expert iteration with custom data preperation and algorithm implementations. Like most things in life, the ultimate in quality will be a concerted effort of experts working over time, but that shouldn't get in the way of a quick start. + +*Is PostgresML fast?* + +Colocating the compute with the data inside the database removes one of the most common latency bottlenecks in the ML stack, which is the (de)serialization of data between stores and services across the wire. Modern versions of Postgres also support automatic query parrellization across multiple workers to further minimize latency in large batch workloads. Finally, PostgresML will utilize GPU compute if both the algorithm and hardware support it, although it is currently rare in practice for production databases to have GPUs. Checkout our [benchmarks](https://todo). + -Quick demo with Postgres, PL/Python, and Scikit. ### Installation in WSL or Ubuntu @@ -29,11 +105,9 @@ Install Scikit globally (I didn't bother setup Postgres with a virtualenv, but i sudo pip3 install sklearn ``` -### Run the demo +### Run the example ```bash -sudo mkdir /app/models -sudo chown postgres:postgres /app/models psql -f scikit_train_and_predict.sql ``` diff --git a/benchmarks.sql b/benchmarks.sql new file mode 100644 index 000000000..f2a6bfc5c --- /dev/null +++ b/benchmarks.sql @@ -0,0 +1,23 @@ +-- +-- CREATE EXTENSION +-- +CREATE EXTENSION IF NOT EXISTS plpython3u; + +CREATE OR REPLACE FUNCTION pg_call() +RETURNS INT +AS $$ +BEGIN + RETURN 1; +END; +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION py_call() +RETURNS INT +AS $$ + return 1; +$$ LANGUAGE plpython3u; + +\timing on +SELECT generate_series(1, 50000), pg_call(); -- Time: 20.679 ms +SELECT generate_series(1, 50000), py_call(); -- Time: 67.355 ms + diff --git a/pgml/pgml/model.py b/pgml/pgml/model.py new file mode 100644 index 000000000..b34145aca --- /dev/null +++ b/pgml/pgml/model.py @@ -0,0 +1,394 @@ +import plpy +from sklearn.linear_model import LinearRegression +from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier +from sklearn.model_selection import train_test_split +from sklearn.metrics import mean_squared_error, r2_score + +import pickle + +from pgml.exceptions import PgMLException +from pgml.sql import q + +class Project(object): + """ + Use projects to refine multiple models of a particular dataset on a specific objective. + + Attributes: + id (int): a unique identifier + name (str): a human friendly unique identifier + objective (str): the purpose of this project + created_at (Timestamp): when this project was created + updated_at (Timestamp): when this project was last updated + """ + + _cache = {} + + def __init__(self): + self._deployed_model = None + + @classmethod + def find(cls, id: int): + """ + Get a Project from the database. + + Args: + id (int): the project id + Returns: + Project or None: instantiated from the database if found + """ + result = plpy.execute(f""" + SELECT * + FROM pgml.projects + WHERE id = {q(id)} + """, 1) + if len(result) == 0: + return None + + project = Project() + project.__dict__ = dict(result[0]) + project.__init__() + cls._cache[project.name] = project + return project + + @classmethod + def find_by_name(cls, name: str): + """ + Get a Project from the database by name. + + This is the prefered API to retrieve projects, and they are cached by + name to avoid needing to go to he database on every usage. + + Args: + name (str): the project name + Returns: + Project or None: instantiated from the database if found + """ + if name in cls._cache: + return cls._cache[name] + + result = plpy.execute(f""" + SELECT * + FROM pgml.projects + WHERE name = {q(name)} + """, 1) + if len(result)== 0: + return None + + project = Project() + project.__dict__ = dict(result[0]) + project.__init__() + cls._cache[name] = project + return project + + @classmethod + def create(cls, name: str, objective: str): + """ + Create a Project and save it to the database. + + Args: + name (str): a human friendly identifier + objective (str): valid values are ["regression", "classification"]. + Returns: + Project: instantiated from the database + """ + + project = Project() + project.__dict__ = dict(plpy.execute(f""" + INSERT INTO pgml.projects (name, objective) + VALUES ({q(name)}, {q(objective)}) + RETURNING * + """, 1)[0]) + project.__init__() + cls._cache[name] = project + return project + + @property + def deployed_model(self): + """ + Returns: + Model: that should currently be used for predictions + """ + if self._deployed_model is None: + self._deployed_model = Model.find_deployed(self.id) + return self._deployed_model + +class Snapshot(object): + """ + Snapshots capture a set of training & test data for repeatability. + + Attributes: + id (int): a unique identifier + relation_name (str): the name of the table or view to snapshot + y_column_name (str): the label for training data + test_size (float or int, optional): If float, should be between 0.0 and 1.0 and represent the proportion of the dataset to include in the test split. If int, represents the absolute number of test samples. If None, the value is set to the complement of the train size. If train_size is also None, it will be set to 0.25. + test_sampling (str, optional): How to sample to create the test data. Defaults to "random". Valid values are ["first", "last", "random"]. + status (str): The current status of the snapshot, e.g. 'new' or 'created' + created_at (Timestamp): when this snapshot was created + updated_at (Timestamp): when this snapshot was last updated + """ + @classmethod + def create(cls, relation_name: str, y_column_name: str, test_size: float or int, test_sampling: str): + """ + Create a Snapshot and save it to the database. + + This creates both a metadata record in the snapshots table, as well as creating a new table + that holds a snapshot of all the data currently present in the relation so that training + runs may be repeated, or further analysis may be conducted against the input. + + Args: + relation_name (str): the name of the table or view to snapshot + y_column_name (str): the label for training data + test_size (float or int, optional): If float, should be between 0.0 and 1.0 and represent the proportion of the dataset to include in the test split. If int, represents the absolute number of test samples. If None, the value is set to the complement of the train size. If train_size is also None, it will be set to 0.25. + test_sampling: (str, optional): How to sample to create the test data. Defaults to "random". Valid values are ["first", "last", "random"]. + Returns: + Snapshot: metadata instantiated from the database + """ + + snapshot = Snapshot() + snapshot.__dict__ = dict(plpy.execute(f""" + INSERT INTO pgml.snapshots (relation_name, y_column_name, test_size, test_sampling, status) + VALUES ({q(relation_name)}, {q(y_column_name)}, {q(test_size)}, {q(test_sampling)}, 'new') + RETURNING * + """, 1)[0]) + plpy.execute(f""" + CREATE TABLE pgml."snapshot_{snapshot.id}" AS + SELECT * FROM "{snapshot.relation_name}"; + """) + snapshot.__dict__ = dict(plpy.execute(f""" + UPDATE pgml.snapshots + SET status = 'created' + WHERE id = {q(snapshot.id)} + RETURNING * + """, 1)[0]) + return snapshot + + def data(self): + """ + Returns: + list, list, list, list: All rows from the snapshot split into X_train, X_test, y_train, y_test sets. + """ + data = plpy.execute(f""" + SELECT * + FROM pgml."snapshot_{self.id}" + """) + + print(data) + # Sanity check the data + if len(data) == 0: + PgMLException( + f"Relation `{self.relation_name}` contains no rows. Did you pass the correct `relation_name`?" + ) + if self.y_column_name not in data[0]: + PgMLException( + f"Column `{self.y_column_name}` not found. Did you pass the correct `y_column_name`?" + ) + + # Always pull the columns in the same order from the row. + # Python dict iteration is not always in the same order (hash table). + columns = list(data[0].keys()) + columns.remove(self.y_column_name) + columns.sort() + + # Split the label from the features + X = [] + y = [] + for row in data: + y_ = row.pop(self.y_column_name) + x_ = [] + + for column in columns: + x_.append(row[column]) + + X.append(x_) + y.append(y_) + + # Split into training and test sets + if self.test_sampling == 'random': + return train_test_split(X, y, test_size=self.test_size, random_state=0) + else: + if self.test_sampling == 'first': + X.reverse() + y.reverse() + if isinstance(split, float): + split = 1.0 - split + split = self.test_size + if isinstance(split, float): + split = int(self.test_size * X.len()) + return X[:split], X[split:], y[:split], y[split:] + + + # TODO normalize and clean data + +class Model(object): + """Models use an algorithm on a snapshot of data to record the parameters learned. + + Attributes: + project (str): the project the model belongs to + snapshot (str): the snapshot that provides the training and test data + algorithm_name (str): the name of the algorithm used to train this model + status (str): The current status of the model, e.g. 'new', 'training' or 'successful' + created_at (Timestamp): when this model was created + updated_at (Timestamp): when this model was last updated + mean_squared_error (float): + r2_score (float): + pickle (bytes): the serialized version of the model parameters + algorithm: the in memory version of the model parameters that can make predictions + """ + @classmethod + def create(cls, project: Project, snapshot: Snapshot, algorithm_name: str): + """ + Create a Model and save it to the database. + + Args: + project (str): + snapshot (str): + algorithm_name (str): + Returns: + Model: instantiated from the database + """ + result = plpy.execute(f""" + INSERT INTO pgml.models (project_id, snapshot_id, algorithm_name, status) + VALUES ({q(project.id)}, {q(snapshot.id)}, {q(algorithm_name)}, 'new') + RETURNING * + """) + model = Model() + model.__dict__ = dict(result[0]) + model.__init__() + model._project = project + return model + + @classmethod + def find_deployed(cls, project_id: int): + """ + Args: + project_id (int): The project id + Returns: + Model: that should currently be used for predictions of the project + """ + result = plpy.execute(f""" + SELECT models.* + FROM pgml.models + JOIN pgml.deployments + ON deployments.model_id = models.id + AND deployments.project_id = {q(project_id)} + ORDER by deployments.created_at DESC + LIMIT 1 + """) + if len(result) == 0: + return None + + model = Model() + model.__dict__ = dict(result[0]) + model.__init__() + return model + + def __init__(self): + self._algorithm = None + self._project = None + + @property + def project(self): + """ + Returns: + Project: that this model belongs to + """ + if self._project is None: + self._project = Project.find(self.project_id) + return self._project + + @property + def algorithm(self): + if self._algorithm is None: + if self.pickle is not None: + self._algorithm = pickle.loads(self.pickle) + else: + self._algorithm = { + 'linear_regression': LinearRegression, + 'random_forest_regression': RandomForestRegressor, + 'random_forest_classification': RandomForestClassifier + }[self.algorithm_name + '_' + self.project.objective]() + + return self._algorithm + + def fit(self, snapshot: Snapshot): + """ + Learns the parameters of this model and records them in the database. + + Args: + snapshot (Snapshot): dataset used to train this model + """ + X_train, X_test, y_train, y_test = snapshot.data() + + # Train the model + self.algorithm.fit(X_train, y_train) + + # Test + y_pred = self.algorithm.predict(X_test) + msq = mean_squared_error(y_test, y_pred) + r2 = r2_score(y_test, y_pred) + + # Save the model + self.__dict__ = dict(plpy.execute(f""" + UPDATE pgml.models + SET pickle = '\\x{pickle.dumps(self.algorithm).hex()}', + status = 'successful', + mean_squared_error = {q(msq)}, + r2_score = {q(r2)} + WHERE id = {q(self.id)} + RETURNING * + """)[0]) + + def deploy(self): + """Promote this model to the active version for the project that will be used for predictions""" + plpy.execute(f""" + INSERT INTO pgml.deployments (project_id, model_id) + VALUES ({q(self.project_id)}, {q(self.id)}) + """) + + def predict(self, data: list): + """Use the model for a set of features. + + Args: + data (list): list of features to form a single prediction for + + Returns: + float or int: scores for regressions or ints for classifications + """ + return self.algorithm.predict(data) + + +def train( + project_name: str, + objective: str, + relation_name: str, + y_column_name: str, + test_size: float or int = 0.1, + test_sampling: str = "random" +): + """Create a regression model from a table or view filled with training data. + + Args: + project_name (str): a human friendly identifier + objective (str): Defaults to "regression". Valid values are ["regression", "classification"]. + relation_name (str): the table or view that stores the training data + y_column_name (str): the column in the training data that acts as the label + algorithm (str, optional): the algorithm used to implement the objective. Defaults to "linear". Valid values are ["linear", "random_forest"]. + test_size (float or int, optional): If float, should be between 0.0 and 1.0 and represent the proportion of the dataset to include in the test split. If int, represents the absolute number of test samples. If None, the value is set to the complement of the train size. If train_size is also None, it will be set to 0.25. + test_sampling: (str, optional): How to sample to create the test data. Defaults to "random". Valid values are ["first", "last", "random"]. + """ + project = Project.create(project_name, objective) + snapshot = Snapshot.create(relation_name, y_column_name, test_size, test_sampling) + best_model = None + best_error = None + if objective == "regression": + algorithms = ["linear", "random_forest"] + elif objective == "classification": + algorithms = ["random_forest"] + + for algorithm_name in algorithms: + model = Model.create(project, snapshot, algorithm_name) + model.fit(snapshot) + if best_error is None or model.mean_squared_error < best_error: + best_error = model.mean_squared_error + best_model = model + best_model.deploy() diff --git a/pgml/pgml/score.py b/pgml/pgml/score.py deleted file mode 100644 index cbb415825..000000000 --- a/pgml/pgml/score.py +++ /dev/null @@ -1,17 +0,0 @@ -"""Score""" - -import os -import pickle - -from pgml.exceptions import PgMLException - - -def load(name, source): - """Load a model from file.""" - path = os.path.join(source, name) - - if not os.path.exists(path): - raise PgMLException(f"Model source directory `{path}` does not exist.") - - with open(path, "rb") as f: - return pickle.load(f) diff --git a/pgml/pgml/sql.py b/pgml/pgml/sql.py index 508ae6045..79ab69bdc 100644 --- a/pgml/pgml/sql.py +++ b/pgml/pgml/sql.py @@ -1,32 +1,6 @@ -"""Tools to run SQL. -""" -import os - - -def all_rows(cursor): - """Fetch all rows from a plpy-like cursor.""" - while True: - rows = cursor.fetch(5) - if not rows: - return - - for row in rows: - yield row - - -def models_directory(plpy): - """Get the directory where we store our models.""" - data_directory = plpy.execute( - """ - SELECT setting FROM pg_settings WHERE name = 'data_directory' - """, - 1, - )[0]["setting"] - - models_dir = os.path.join(data_directory, "pgml_models") - - # TODO: Ideally this happens during extension installation. - if not os.path.exists(models_dir): - os.mkdir(models_dir, 0o770) - - return models_dir +from plpy import quote_literal + +def q(obj): + if type(obj) == str: + return quote_literal(obj) + return obj diff --git a/pgml/pgml/train.py b/pgml/pgml/train.py deleted file mode 100644 index 968cc6e59..000000000 --- a/pgml/pgml/train.py +++ /dev/null @@ -1,75 +0,0 @@ -""" -Train the model. -""" - -# TODO: import more models here -from sklearn.linear_model import LinearRegression -from sklearn.model_selection import train_test_split -from sklearn.metrics import mean_squared_error, r2_score - -import pickle -import os - -from pgml.sql import all_rows -from pgml.exceptions import PgMLException -from pgml.validate import check_type - - -def train(cursor, y_column, name, save=True, destination="/tmp/pgml_models"): - """Train the model on data on some rows. - - Arguments: - - cursor: iterable with rows, - - y_column: the name of the column containing the y predicate (a.k.a solution), - - name: the name of the model, e.g 'test_model', - - save: to save the model to disk or not. - - Return: - Path on disk where the model was saved or could be saved if saved=True. - """ - X = [] - y = [] - columns = [] - - for row in all_rows(cursor): - row = row.copy() - - check_type(row) - - if y_column not in row: - PgMLException( - f"Column `{y}` not found. Did you name your `y_column` correctly?" - ) - - y_ = row.pop(y_column) - x_ = [] - - # Always pull the columns in the same order from the row. - # Python dict iteration is not always in the same order (hash table). - if not columns: - for col in row: - columns.append(col) - - for column in columns: - x_.append(row[column]) - X.append(x_) - y.append(y_) - - X_train, X_test, y_train, y_test = train_test_split(X, y) - - # Just linear regression for now, but can add many more later. - lr = LinearRegression() - lr.fit(X_train, y_train) - - # Test - y_pred = lr.predict(X_test) - msq = mean_squared_error(y_test, y_pred) - r2 = r2_score(y_test, y_pred) - - path = os.path.join(destination, name) - - if save: - with open(path, "wb") as f: - pickle.dump(lr, f) - - return path, msq, r2 diff --git a/pgml/pgml/validate.py b/pgml/pgml/validate.py deleted file mode 100644 index 2fa08acb3..000000000 --- a/pgml/pgml/validate.py +++ /dev/null @@ -1,13 +0,0 @@ -""" -Run some basic sanity checks on the data. -""" - -# import sklearn -from pgml.exceptions import PgMLException - - -def check_type(row): - """We only accept certain column types for now.""" - for col in row: - if type(row[col]) not in (int, float): - raise PgMLException(f"Column '{col}' is not a integer or float.") diff --git a/pgml/tests/plpy.py b/pgml/tests/plpy.py new file mode 100644 index 000000000..4bbbbc6fd --- /dev/null +++ b/pgml/tests/plpy.py @@ -0,0 +1,16 @@ +from collections import deque + +execute_results = deque() + +def quote_literal(literal): + return "'" + literal + "'" + +def execute(sql, lines = 0): + if len(execute_results) > 0: + result = execute_results.popleft() + return result + else: + return [] + +def add_mock_result(result): + execute_results.append(result) diff --git a/pgml/tests/test_model.py b/pgml/tests/test_model.py new file mode 100644 index 000000000..02605982d --- /dev/null +++ b/pgml/tests/test_model.py @@ -0,0 +1,65 @@ +# stub out plpy +from . import plpy +import sys +sys.modules['plpy'] = plpy + +import time +import unittest +from pgml import model + +class TestModel(unittest.TestCase): + def test_the_world(self): + plpy.add_mock_result( + [{"id": 1, "name": "Test", "objective": "regression", "created_at": time.time(), "updated_at": time.time()}] + ) + plpy.add_mock_result( + [{"id": 1, "relation_name": "test", "y_column_name": "test_y", "test_size": 0.1, "test_sampling": "random", "status": "new", "created_at": time.time(), "updated_at": time.time()}] + ) + plpy.add_mock_result( + "OK" + ) + plpy.add_mock_result( + [{"id": 1, "relation_name": "test", "y_column_name": "test_y", "test_size": 0.1, "test_sampling": "random", "status": "created", "created_at": time.time(), "updated_at": time.time()}] + ) + plpy.add_mock_result( + [{"id": 1, "project_id": 1, "snapshot_id": 1, "algorithm_name": "linear", "status": "new", "r2_score": None, "mean_squared_error": None, "pickle": None, "created_at": time.time(), "updated_at": time.time()}] + ) + plpy.add_mock_result( + [ + {"a": 1, "b": 2, "test_y": 3}, + {"a": 2, "b": 3, "test_y": 4}, + {"a": 3, "b": 4, "test_y": 5}, + ] + ) + plpy.add_mock_result( + [{"id": 1, "project_id": 1, "snapshot_id": 1, "algorithm_name": "linear", "status": "new", "r2_score": None, "mean_squared_error": None, "pickle": None, "created_at": time.time(), "updated_at": time.time()}] + ) + + plpy.add_mock_result( + [{"id": 1, "project_id": 1, "snapshot_id": 1, "algorithm_name": "linear", "status": "new", "r2_score": None, "mean_squared_error": None, "pickle": None, "created_at": time.time(), "updated_at": time.time()}] + ) + plpy.add_mock_result( + [ + {"a": 1, "b": 2, "test_y": 3}, + {"a": 2, "b": 3, "test_y": 4}, + {"a": 3, "b": 4, "test_y": 5}, + ] + ) + plpy.add_mock_result( + [{"id": 1, "project_id": 1, "snapshot_id": 1, "algorithm_name": "linear", "status": "new", "r2_score": None, "mean_squared_error": None, "pickle": None, "created_at": time.time(), "updated_at": time.time()}] + ) + + plpy.add_mock_result( + [{"id": 1, "project_id": 1, "snapshot_id": 1, "algorithm_name": "linear", "status": "new", "r2_score": None, "mean_squared_error": None, "pickle": None, "created_at": time.time(), "updated_at": time.time()}] + ) + plpy.add_mock_result( + [ + {"a": 1, "b": 2, "test_y": 3}, + {"a": 2, "b": 3, "test_y": 4}, + {"a": 3, "b": 4, "test_y": 5}, + ] + ) + plpy.add_mock_result( + [{"id": 1, "project_id": 1, "snapshot_id": 1, "algorithm_name": "linear", "status": "new", "r2_score": None, "mean_squared_error": None, "pickle": None, "created_at": time.time(), "updated_at": time.time()}] + ) + model.train("Test", "regression", "test", "test_y") diff --git a/pgml/tests/test_train.py b/pgml/tests/test_train.py deleted file mode 100644 index fe2438ea7..000000000 --- a/pgml/tests/test_train.py +++ /dev/null @@ -1,31 +0,0 @@ -from pgml.train import train - - -class PlPyIterator: - def __init__(self, values): - self._values = values - self._returned = False - - def fetch(self, n): - if self._returned: - return - else: - self._returned = True - return self._values - - -def test_train(): - it = PlPyIterator( - [ - { - "value": 5, - "weight": 5, - }, - { - "value": 34, - "weight": 5, - }, - ] - ) - - train(it, y_column="weight", name="test", save=False) diff --git a/pgml/tests/test_validate.py b/pgml/tests/test_validate.py deleted file mode 100644 index b7118c4b0..000000000 --- a/pgml/tests/test_validate.py +++ /dev/null @@ -1,22 +0,0 @@ -from pgml.validate import check_type -from pgml.exceptions import PgMLException - -import pytest - - -def test_check_type(): - row = { - "col1": 1, - "col2": "text", - "col3": 1.5, - } - - check_type(row) - - row = { - "col1": 1, - "col2": Exception(), - } - - with pytest.raises(PgMLException): - check_type(row) diff --git a/scikit_train_and_predict.sql b/scikit_train_and_predict.sql index 3aa93e97a..6f8b5c990 100644 --- a/scikit_train_and_predict.sql +++ b/scikit_train_and_predict.sql @@ -26,7 +26,7 @@ INSERT INTO scikit_train_data (value, weight) SELECT generate_series(1, 500), 5. CREATE OR REPLACE FUNCTION scikit_learn_train_example() -RETURNS TEXT +RETURNS BYTEA AS $$ from sklearn.ensemble import RandomForestClassifier import pickle @@ -45,27 +45,27 @@ AS $$ rfc = RandomForestClassifier() rfc.fit(X, y) - with open("/app/models/postgresml-rfc.pickle", "wb") as f: - pickle.dump(rfc, f) - return "OK" + return pickle.dumps(rfc) $$ LANGUAGE plpython3u; -SELECT scikit_learn_train_example(); +; -CREATE OR REPLACE FUNCTION scikit_learn_predict_example(value INT) +CREATE OR REPLACE FUNCTION scikit_learn_predict_example(model BYTEA, value INT) RETURNS DOUBLE PRECISION AS $$ import pickle - with open("/app/models/postgresml-rfc.pickle", "rb") as f: - m = pickle.load(f) + m = pickle.loads(model) r = m.predict([[value,]]) return r[0] $$ LANGUAGE plpython3u; +WITH model as ( + SELECT scikit_learn_train_example() AS pickle +) SELECT value, weight, - scikit_learn_predict_example(value::int) AS prediction + scikit_learn_predict_example((SELECT model.pickle FROM model), value::int) AS prediction FROM scikit_train_view LIMIT 5; diff --git a/sql/install.sql b/sql/install.sql index a34757dbf..b2758cda5 100644 --- a/sql/install.sql +++ b/sql/install.sql @@ -1,108 +1,123 @@ +SET client_min_messages TO WARNING; -- Create the PL/Python3 extension. CREATE EXTENSION IF NOT EXISTS plpython3u; +--- +--- Create schema for models. +--- DROP SCHEMA pgml CASCADE; CREATE SCHEMA IF NOT EXISTS pgml; ---- ---- Extension version. ---- -CREATE OR REPLACE FUNCTION pgml.version() -RETURNS TEXT +CREATE OR REPLACE FUNCTION pgml.auto_updated_at(tbl regclass) +RETURNS VOID AS $$ - import pgml - return pgml.version() -$$ LANGUAGE plpython3u; + DECLARE name_parts TEXT[]; + DECLARE name TEXT; +BEGIN + name_parts := string_to_array(tbl::TEXT, '.'); + name := name_parts[array_upper(name_parts, 1)]; + + EXECUTE format('DROP TRIGGER IF EXISTS %s_auto_updated_at ON %s', name, tbl); + EXECUTE format('CREATE TRIGGER %s_auto_updated_at BEFORE UPDATE ON %s + FOR EACH ROW EXECUTE PROCEDURE pgml.set_updated_at()', name, tbl); +END; +$$ +LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION pgml.set_updated_at() +RETURNS TRIGGER +AS $$ +BEGIN + IF ( + NEW IS DISTINCT FROM OLD + AND NEW.updated_at IS NOT DISTINCT FROM OLD.updated_at + ) THEN + NEW.updated_at := clock_timestamp(); + END IF; + RETURN new; +END; +$$ +LANGUAGE plpgsql; + +CREATE TABLE pgml.projects( + id BIGSERIAL PRIMARY KEY, + name TEXT NOT NULL, + objective TEXT NOT NULL, + created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT clock_timestamp(), + updated_at TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT clock_timestamp() +); +SELECT pgml.auto_updated_at('pgml.projects'); +CREATE UNIQUE INDEX projects_name_idx ON pgml.projects(name); ---- ---- Track table versions. ---- -CREATE TABLE pgml.model_versions( +CREATE TABLE pgml.snapshots( + id BIGSERIAL PRIMARY KEY, + relation_name TEXT NOT NULL, + y_column_name TEXT NOT NULL, + test_size FLOAT4 NOT NULL, + test_sampling TEXT NOT NULL, + status TEXT NOT NULL, + created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT clock_timestamp(), + updated_at TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT clock_timestamp() +); +SELECT pgml.auto_updated_at('pgml.snapshots'); + +CREATE TABLE pgml.models( id BIGSERIAL PRIMARY KEY, - name VARCHAR, - location VARCHAR NULL, - data_source TEXT, - y_column VARCHAR, - started_at TIMESTAMP WITHOUT TIME ZONE DEFAULT CURRENT_TIMESTAMP, - ended_at TIMESTAMP WITHOUT TIME ZONE NULL, + project_id BIGINT NOT NULL, + snapshot_id BIGINT NOT NULL, + algorithm_name TEXT NOT NULL, + status TEXT NOT NULL, + created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT clock_timestamp(), + updated_at TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT clock_timestamp(), mean_squared_error DOUBLE PRECISION, r2_score DOUBLE PRECISION, - successful BOOL NULL + pickle BYTEA, + CONSTRAINT project_id_fk FOREIGN KEY(project_id) REFERENCES pgml.projects(id), + CONSTRAINT snapshot_id_fk FOREIGN KEY(snapshot_id) REFERENCES pgml.snapshots(id) ); +CREATE INDEX models_project_id_created_at_idx ON pgml.models(project_id, created_at); +SELECT pgml.auto_updated_at('pgml.models'); + +CREATE TABLE pgml.deployments( + project_id BIGINT NOT NULL, + model_id BIGINT NOT NULL, + created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT clock_timestamp(), + CONSTRAINT project_id_fk FOREIGN KEY(project_id) REFERENCES pgml.projects(id), + CONSTRAINT model_id_fk FOREIGN KEY(model_id) REFERENCES pgml.models(id) +); +CREATE INDEX deployments_project_id_created_at_idx ON pgml.deployments(project_id, created_at); +SELECT pgml.auto_updated_at('pgml.deployments'); + --- ---- Run some validations on the table/view to make sure ---- it'll work without our package. +--- Extension version --- -CREATE OR REPLACE FUNCTION pgml.validate(table_name TEXT) -RETURNS BOOL +CREATE OR REPLACE FUNCTION pgml.version() +RETURNS TEXT AS $$ - from pgml.sql import all_rows - from pgml.validate import check_type - - for row in all_rows(plpy.cursor(f"SELECT * FROM {table_name}")): - check_type(row) - return True + import pgml + return pgml.version() $$ LANGUAGE plpython3u; --- ---- Train the model. +--- Regression --- -CREATE OR REPLACE FUNCTION pgml.train(table_name TEXT, y TEXT) -RETURNS TEXT +CREATE OR REPLACE FUNCTION pgml.train(project_name TEXT, objective TEXT, relation_name TEXT, y_column_name TEXT) +RETURNS VOID AS $$ - from pgml.train import train - from pgml.sql import models_directory - import os + from pgml.model import train - data_source = f"SELECT * FROM {table_name}" - - # Start training. - start = plpy.execute(f""" - INSERT INTO pgml.model_versions - (name, data_source, y_column) - VALUES - ('{table_name}', '{data_source}', '{y}') - RETURNING *""", 1) - - id_ = start[0]["id"] - name = f"{table_name}_{id_}" - - destination = models_directory(plpy) - - # Train! - location, msq, r2 = train(plpy.cursor(data_source), y_column=y, name=name, destination=destination) - - plpy.execute(f""" - UPDATE pgml.model_versions - SET location = '{location}', - successful = true, - mean_squared_error = '{msq}', - r2_score = '{r2}', - ended_at = clock_timestamp() - WHERE id = {id_}""") - - return name + train(project_name, objective, relation_name, y_column_name) $$ LANGUAGE plpython3u; - --- --- Predict --- -CREATE OR REPLACE FUNCTION pgml.score(model_name TEXT, VARIADIC features DOUBLE PRECISION[]) +CREATE OR REPLACE FUNCTION pgml.predict(project_name TEXT, VARIADIC features DOUBLE PRECISION[]) RETURNS DOUBLE PRECISION AS $$ - from pgml.sql import models_directory - from pgml.score import load - import pickle - - if model_name in SD: - model = SD[model_name] - else: - SD[model_name] = load(model_name, models_directory(plpy)) - model = SD[model_name] + from pgml.model import Project - scores = model.predict([features,]) - return scores[0] + return Project.find_by_name(project_name).deployed_model.predict([features,])[0] $$ LANGUAGE plpython3u; diff --git a/sql/test.sql b/sql/test.sql index 3268d83b1..7522f83ec 100644 --- a/sql/test.sql +++ b/sql/test.sql @@ -6,20 +6,14 @@ SELECT pgml.version(); --- Valiate our wine data. -SELECT pgml.validate('wine_quality_red'); - --- Train twice -SELECT pgml.train('wine_quality_red', 'quality'); +\timing -SELECT * FROM pgml.model_versions; +SELECT pgml.train('Red Wine Scores', 'regression', 'wine_quality_red', 'quality'); +SELECT pgml.predict('Red Wine Scores', 7.4, 0.7, 0, 1.9, 0.076, 11, 34, 0.99, 2, 0.5, 9.4); +SELECT pgml.predict('Red Wine Scores', 6.4, 0.7, 0, 1.9, 0.076, 11, 34, 0.99, 2, 0.5, 9.4); +SELECT pgml.predict('Red Wine Scores', 5.4, 0.7, 0, 1.9, 0.076, 11, 34, 0.99, 2, 0.5, 9.4); +SELECT pgml.predict('Red Wine Scores', 3.4, 0.7, 0, 1.9, 0.076, 11, 34, 0.99, 2, 0.5, 9.4); -\timing -WITH latest_model AS ( - SELECT name || '_' || id AS model_name FROM pgml.model_versions ORDER BY id DESC LIMIT 1 -) -SELECT pgml.score( - (SELECT model_name FROM latest_model), -- last model we just trained +SELECT pgml.train('Red Wine Categories', 'classification', 'wine_quality_red', 'quality'); +SELECT pgml.predict('Red Wine Categories', 7.4, 0.7, 0, 1.9, 0.076, 11, 34, 0.99, 2, 0.5, 9.4); - -- features as variadic arguments - 7.4, 0.7, 0, 1.9, 0.076, 11, 34, 0.99, 2, 0.5, 9.4) AS score;








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/postgresml/postgresml/pull/1.diff

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy