Data Warehouse Integration¶
Warning
SQL Ingest is an experimental feature.
Part of your data might live in databases requiring some level of integration with your warehouse to retrieve and inject them into Pachyderm.
Our SQL ingest tool provides a seamless connection between databases and Pachyderm, allowing you to import data from a SQL database into Pachyderm-powered pipelines. By bringing data-driven pipelines, versioning & lineage to structured data, we are allowing Data Science teams to easily combine structured and unstructured data.
Specifically, we help you connect to a remote database of your choice and pull the result of a given query at regular intervals in the form of a CSV or a JSON file.
Use SQL Ingest¶
Pachyderm's SQL Ingest uses jsonnet pipeline specs with the following parameters to automatically create the pipelines that access, query, and materialize the results of a SQL query to a data warehouse. The outputted results can take the form of CSV or JSON files. Check the Formatting section at the bottom of the page for specific details on formats and SQL Datatypes.
Pass in the following parameters and get your results committed to an output repo, ready for the following downstream pipeline:
pachctl update pipeline --jsonnet https://raw.githubusercontent.com/pachyderm/pachyderm/2.2.x/src/templates/sql_ingest_cron.jsonnet \
--arg name=myingest \
--arg url="mysql://root@mysql:3306/test_db" \
--arg query="SELECT * FROM test_data" \
--arg cronSpec="@every 30s" \
--arg secretName="mysql-creds" \
--arg format=json
Where the parameters passed to the jsonnet pipeline spec are:
Parameter | Description |
---|---|
name | The name of output repo in which your query results will materialize. |
url | The connection string to the database. |
query | The SQL query that will be run against your database. |
cronSpec | How often to run the query. For example "@every 60s" . |
format | The type of your output file containing the results of your query (either json or yaml ). |
secretName | The kubernetes secret name that contains the password to the database. |
Example
In this example, we are leveraging Snowflake's support for queries traversing semi-structured data (here, JSON).
-
Find the documentation for the support of semi-structured data in Snowflake here.
-
The query in the following example will use the WEATHER schema in the public test database SNOWFLAKE_SAMPLE_DATA in the COMPUTE_WH warehouse. The column V of the table DAILY_14_TOTAL stores JSON files.
Note the references to the JSON dataset elements by their hierarchical paths in the query:
pachctl update pipeline --jsonnet https://raw.githubusercontent.com/pachyderm/pachyderm/2.2.x/src/templates/sql_ingest_cron.jsonnet \ --arg name=mysnowflakeingest \ --arg url="snowflake://username@VCNYTW-MH64356/SNOWFLAKE_SAMPLE_DATA/WEATHER?warehouse=COMPUTE_WH" \ --arg query="select T, V:city.name, V:data[0].weather[0].description as morning, V:data[12].weather[0].description as pm FROM DAILY_14_TOTAL LIMIT 1" \ --arg cronSpec="@every 30s" \ --arg secretName="snowflakesecret" \ --arg format=json
Note
pachctl update pipeline
will create pipelines if none exist, or update your existing pipelines otherwise.
When the command is run, the database will be queried on a schedule defined in your cronSpec
parameter and a result file committed to the output repo named after name
.
Database Secret¶
Before you create your SQL Ingest pipelines, make sure to create a generic secret containing your database password in the field PACHYDERM_SQL_PASSWORD
.
Example
apiVersion: v1
kind: Secret
metadata:
name: mysql-creds
data:
"PACHYDERM_SQL_PASSWORD": "cm9vdA==" # base64 encoded
TL;DR
-
Run the following command to generate your secret:
kubectl create secret generic <secret-name> --from-literal=PACHYDERM_SQL_PASSWORD=<password-to-warehouse> --dry-run=client --output=json > yourwarehousesecret.json
-
Then apply it to your Pachyderm cluster:
pachctl create secret -f yourwarehousesecret.json
-
The list returned by
kubectl get secret
should feature the secret name.
Database Connection URL¶
Pachyderm's SQL Ingest will take an URL as its connection string to the database of your choice.
The URL is structured as follows:
<protocol>://<username>@<host>:<port>/<database>?<param1>=<value1>&<param2>=<value2>
Where:
Parameter | Description |
---|---|
protocol | The name of the database protocol. As of today, we support: - postgres and postgresql : connect to Postgresql or compatible (for example Redshift).- mysql : connect to MySQL or compatible (for example MariaDB). - snowflake : connect to Snowflake. |
username | The user used to access the database. |
host | The hostname of your database instance. |
port | The port number your instance is listening on. |
database | The name of the database to connect to. |
Snowflake users, you will need a variant of the URL above.
Pachyderm supports two connection URL patterns to query Snowflake:
snowflake://username@<account_identifier>/<db_name>/<schema_name>?warehouse=<warehouse_name>
snowflake://username@hostname:port/<db_name>/<schema_name>?account=<account_identifier>&warehouse=<warehouse_name>
where:
-
The
account_identifier
takes one of the following forms for most URLs:- Option 1 - Account Name:
organization_name
-account_name
- Option 2 - Account Locator:
account_locator
.region
.cloud
In both cases, if you are used to connecting to Snowflake via an URL such as
https://account_identifier.snowflakecomputing.com
, you can use the full domain nameaccount_identifier.snowflakecomputing.com
in the url. - Option 1 - Account Name:
-
And
db_name
/schema_name
are respectively the Database Name and the Schema (namespace) targeted. - Additionally, a
warehouse
, or “compute resource” is required for all queries. Pass your warehouse as a parameter to the url:warehouse=<warehouse_name>
Here is an example of connection string to Snowflake:
"snowflake://username@GVCNYTW-MH64356/SNOWFLAKE_SAMPLE_DATA/WEATHER?warehouse=COMPUTE_WH"
Note
- The password is not included in the URL. It is retrieved from a kubernetes secret or file on disk at the time of the query.
- The additional parameters (
<param1>=<value1>
) are optional and specific to the driver. For example, Snowflake requires to pass the warehouse as a parameterwarehouse=<your-warehouse>
.
How Does This Work?¶
SQL Ingest's jsonnet pipeline specs sql_ingest_cron.jsonnet
creates two pipelines:
- A Cron Pipeline
myingest_queries
triggering at an interval set bycronSpec
and outputting a file/0000
in its output repomyingest_queries
./0000
contains a timestamp and the SQL statement set inquery
. - The following pipeline
myingest
takes the/0000
file as input and runs the query against the database set inurl
. The query's result is then materialized in a file (JSON or CSV) of the same name/0000
committed to the output repomyingest
.
Note
The name of each pipeline and related input and output repos are derived from the name
parameter. In the example above, we have set --arg name=myingest
.
The same base image pachctf is used in both pipelines.
Check the visual representation of the SQL Ingest DAG created above in Console:
In your terminal:
-
The list of the DAG's pipelines (
pachctl list pipeline
) looks like this: -
3 repos are created:
How To Inspect The Result Of A Query?¶
You have run a query using SQL Ingest. How do you inspect its result?
-
Check what the query looked like:
pachctl get file myingest_queries@master:/0000
-- 1643235475 SELECT * FROM test_data
-
Read the file written to the output repo
myingest
:pachctl list file myingest@master
NAME TYPE SIZE /0000 file 52B
pachctl get file myingest@master:/0000
{"mycolumn":"hello world","id":1} {"mycolumn":"hello you","id":2}
Formats and SQL DataTypes¶
The following comments on formatting reflect the state of this release and are subject to change.
SQL datatypes supported¶
We support the following SQL datatypes. Some of those Data Types are specific to a database.
Dates/Timestamps | Varchars | Numerics | Booleans |
---|---|---|---|
DATE TIME TIMESTAMP TIMESTAMP_LTZ TIMESTAMP_NTZ TIMESTAMP_TZ TIMESTAMPTZ TIMESTAMP WITH TIME ZONE TIMESTAMP WITHOUT TIME ZONE | VARCHAR TEXT CHARACTER VARYING | SMALLINT INT2 INTEGER INT INT4 BIGINT INT8 FLOAT FLOAT4 FLOAT8 REAL DOUBLE PRECISION NUMERIC DECIMAL NUMBER | BOOL BOOLEAN |
Formatting¶
-
All numeric values are converted into strings in your CSV and JSON.
Warning
- Note that infinite (Inf) and not a number (NaN) values will also be stored as strings in JSON files.
- Use this format
#.#
for all decimals that you plan to egress back to a database.
Examples
Database CSV JSON 12345 12345 "12345" 123.45 123.45 "123.45" -
Date/Timestamps
Examples
Type Database CSV JSON Date 2022-05-09 2022-05-09T00:00:00 "2022-05-09T00:00:00" Timestamp ntz 2022-05-09 16:43:00 2022-05-09T16:43:00 "2022-05-09T16:43:00" Timestamp tz 2022-05-09 16:43:00-05:00 2022-05-09T16:43:00-05:00 "2022-05-09T16:43:00-05:00" -
Strings
Keep in mind when parsing your CSVs in your user code that we escape
"
with""
in CSV files.Examples
Database CSV "null" null `""` """""" "" "" nil "my string"
"""my string""" "this will be enclosed in quotes because it has a ," "this will be enclosed in quotes because it has a ,"