Contents
pg_clickhouse Tutorial
Overview
This tutorial follows the [ClickHouse tutorial] but runs all its queries via pg_clickhouse.
Start ClickHouse
First, create a ClickHouse database if you don’t already have one. A quick way to start is with the Docker image:
docker run -d --network host --name clickhouse -p 8123:8123 -p9000:9000 --ulimit nofile=262144:262144 clickhouse
docker exec -it clickhouse clickhouse-client
Create a Table
Let’s borrow from the [ClickHouse tutorial] to create a simple database with The New York City taxi dataset:
CREATE DATABASE taxi;
CREATE TABLE taxi.trips
(
trip_id UInt32,
vendor_id Enum8(
'1' = 1, '2' = 2, '3' = 3, '4' = 4,
'CMT' = 5, 'VTS' = 6, 'DDS' = 7, 'B02512' = 10,
'B02598' = 11, 'B02617' = 12, 'B02682' = 13, 'B02764' = 14,
'' = 15
),
pickup_date Date,
pickup_datetime DateTime,
dropoff_date Date,
dropoff_datetime DateTime,
store_and_fwd_flag UInt8,
rate_code_id UInt8,
pickup_longitude Float64,
pickup_latitude Float64,
dropoff_longitude Float64,
dropoff_latitude Float64,
passenger_count UInt8,
trip_distance Float64,
fare_amount Decimal(10, 2),
extra Decimal(10, 2),
mta_tax Decimal(10, 2),
tip_amount Decimal(10, 2),
tolls_amount Decimal(10, 2),
ehail_fee Decimal(10, 2),
improvement_surcharge Decimal(10, 2),
total_amount Decimal(10, 2),
payment_type Enum8('UNK' = 0, 'CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4),
trip_type UInt8,
pickup FixedString(25),
dropoff FixedString(25),
cab_type Enum8('yellow' = 1, 'green' = 2, 'uber' = 3),
pickup_nyct2010_gid Int8,
pickup_ctlabel Float32,
pickup_borocode Int8,
pickup_ct2010 String,
pickup_boroct2010 String,
pickup_cdeligibil String,
pickup_ntacode FixedString(4),
pickup_ntaname String,
pickup_puma UInt16,
dropoff_nyct2010_gid UInt8,
dropoff_ctlabel Float32,
dropoff_borocode UInt8,
dropoff_ct2010 String,
dropoff_boroct2010 String,
dropoff_cdeligibil String,
dropoff_ntacode FixedString(4),
dropoff_ntaname String,
dropoff_puma UInt16
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(pickup_date)
ORDER BY pickup_datetime;
Add the Data Set
And then import the data:
INSERT INTO taxi.trips
SELECT * FROM s3(
'https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/trips_{1..2}.gz',
'TabSeparatedWithNames', "
trip_id UInt32,
vendor_id Enum8(
'1' = 1, '2' = 2, '3' = 3, '4' = 4,
'CMT' = 5, 'VTS' = 6, 'DDS' = 7, 'B02512' = 10,
'B02598' = 11, 'B02617' = 12, 'B02682' = 13, 'B02764' = 14,
'' = 15
),
pickup_date Date,
pickup_datetime DateTime,
dropoff_date Date,
dropoff_datetime DateTime,
store_and_fwd_flag UInt8,
rate_code_id UInt8,
pickup_longitude Float64,
pickup_latitude Float64,
dropoff_longitude Float64,
dropoff_latitude Float64,
passenger_count UInt8,
trip_distance Float64,
fare_amount Decimal(10, 2),
extra Decimal(10, 2),
mta_tax Decimal(10, 2),
tip_amount Decimal(10, 2),
tolls_amount Decimal(10, 2),
ehail_fee Decimal(10, 2),
improvement_surcharge Decimal(10, 2),
total_amount Decimal(10, 2),
payment_type Enum8('UNK' = 0, 'CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4),
trip_type UInt8,
pickup FixedString(25),
dropoff FixedString(25),
cab_type Enum8('yellow' = 1, 'green' = 2, 'uber' = 3),
pickup_nyct2010_gid Int8,
pickup_ctlabel Float32,
pickup_borocode Int8,
pickup_ct2010 String,
pickup_boroct2010 String,
pickup_cdeligibil String,
pickup_ntacode FixedString(4),
pickup_ntaname String,
pickup_puma UInt16,
dropoff_nyct2010_gid UInt8,
dropoff_ctlabel Float32,
dropoff_borocode UInt8,
dropoff_ct2010 String,
dropoff_boroct2010 String,
dropoff_cdeligibil String,
dropoff_ntacode FixedString(4),
dropoff_ntaname String,
dropoff_puma UInt16
") SETTINGS input_format_try_infer_datetimes = 0
Make sure we can query it then quit the client:
SELECT count() FROM taxi.trips;
quit
Install pg_clickhouse
Build and install pg_clickhouse from PGXN or GitHub. Or spin up a Docker container using the pg_clickhouse image, which simply adds pg_clickhouse to the Docker Postgres image:
docker run --network host --name pg_clickhouse -e POSTGRES_PASSWORD=my_pass \
-d ghcr.io/clickhouse/pg_clickhouse:18 -U postgres
Connect pg_clickhouse
Now connect to Postgres and create pg_clickhouse:
CREATE EXTENSION pg_clickhouse;
Create a foreign server using the host name, port, and database for your ClickHouse database.
CREATE SERVER taxi_srv FOREIGN DATA WRAPPER clickhouse_fdw
OPTIONS(driver 'binary', host 'localhost', dbname 'taxi');
Here we’ve elected to use the binary driver, which uses the ClickHouse binary protocol. You can also use the “http” driver, which uses the HTTP interface.
Next, map a PostgreSQLu user to a ClickHouse user. The simplest way to do so is just to map the current PostgreSQL user to a remote user for the foreign server:
CREATE USER MAPPING FOR CURRENT_USER SERVER taxi_srv
OPTIONS (user 'default');
You can also specify a password option.
Now, add the taxi table, just import it all of the tables from the remote ClickHouse database into a Postgres schema:
CREATE SCHEMA taxi;
IMPORT FOREIGN SCHEMA taxi FROM SERVER taxi_srv INTO taxi;
And now the table should be imported: In psql, use \det+ to see it:
taxi=# \det+ taxi.*
List of foreign tables
Schema | Table | Server | FDW options | Description
--------+-------+----------+-----------------------------------------------------------+-------------
taxi | trips | taxi_srv | (database 'taxi', table_name 'trips', engine 'MergeTree') | [null]
(1 row)
Success! Use \d to show all the columns:
taxi=# \d taxi.trips
Foreign table "taxi.trips"
Column | Type | Collation | Nullable | Default | FDW options
-----------------------+-----------------------------+-----------+----------+---------+-------------
trip_id | bigint | | not null | |
vendor_id | text | | not null | |
pickup_date | date | | not null | |
pickup_datetime | timestamp without time zone | | not null | |
dropoff_date | date | | not null | |
dropoff_datetime | timestamp without time zone | | not null | |
store_and_fwd_flag | smallint | | not null | |
rate_code_id | smallint | | not null | |
pickup_longitude | double precision | | not null | |
pickup_latitude | double precision | | not null | |
dropoff_longitude | double precision | | not null | |
dropoff_latitude | double precision | | not null | |
passenger_count | smallint | | not null | |
trip_distance | double precision | | not null | |
fare_amount | numeric(10,2) | | not null | |
extra | numeric(10,2) | | not null | |
mta_tax | numeric(10,2) | | not null | |
tip_amount | numeric(10,2) | | not null | |
tolls_amount | numeric(10,2) | | not null | |
ehail_fee | numeric(10,2) | | not null | |
improvement_surcharge | numeric(10,2) | | not null | |
total_amount | numeric(10,2) | | not null | |
payment_type | text | | not null | |
trip_type | smallint | | not null | |
pickup | character varying(25) | | not null | |
dropoff | character varying(25) | | not null | |
cab_type | text | | not null | |
pickup_nyct2010_gid | smallint | | not null | |
pickup_ctlabel | real | | not null | |
pickup_borocode | smallint | | not null | |
pickup_ct2010 | text | | not null | |
pickup_boroct2010 | text | | not null | |
pickup_cdeligibil | text | | not null | |
pickup_ntacode | character varying(4) | | not null | |
pickup_ntaname | text | | not null | |
pickup_puma | integer | | not null | |
dropoff_nyct2010_gid | smallint | | not null | |
dropoff_ctlabel | real | | not null | |
dropoff_borocode | smallint | | not null | |
dropoff_ct2010 | text | | not null | |
dropoff_boroct2010 | text | | not null | |
dropoff_cdeligibil | text | | not null | |
dropoff_ntacode | character varying(4) | | not null | |
dropoff_ntaname | text | | not null | |
dropoff_puma | integer | | not null | |
Server: taxi_srv
FDW options: (database 'taxi', table_name 'trips', engine 'MergeTree')
Now query the table:
SELECT count(*) FROM taxi.trips;
count
---------
1999657
(1 row)
Note how quickly the query executed. pg_clickhouse pushes down the entire
query, including the COUNT() aggregate, so it runs on ClickHouse and only
returns the single row to Postgres. Use EXPLAIN to see it:
EXPLAIN select count(*) from taxi.trips;
QUERY PLAN
-------------------------------------------------
Foreign Scan (cost=1.00..-0.90 rows=1 width=8)
Relations: Aggregate on (trips)
(2 rows)
Note that “Foreign Scan” appears at the root of the plan, meaning that the entire query was pushed down to ClickHouse.
Analyze the data
Run some queries to analyze the data. Explore the following examples or try your own SQL query.
-
Calculate the average tip amount:
taxi=# \timing Timing is on. taxi=# SELECT round(avg(tip_amount), 2) FROM taxi.trips; round ------- 1.68 (1 row) Time: 9.438 ms -
Calculate the average cost based on the number of passengers:
taxi=# SELECT passenger_count, avg(total_amount)::NUMERIC(10, 2) AS average_total_amount FROM taxi.trips GROUP BY passenger_count; passenger_count | average_total_amount -----------------+---------------------- 0 | 22.68 1 | 15.96 2 | 17.14 3 | 16.75 4 | 17.32 5 | 16.34 6 | 16.03 7 | 59.79 8 | 36.40 9 | 9.79 (10 rows) Time: 27.266 ms -
Calculate the daily number of pickups per neighborhood:
taxi=# SELECT pickup_date, pickup_ntaname, SUM(1) AS number_of_trips FROM taxi.trips GROUP BY pickup_date, pickup_ntaname ORDER BY pickup_date ASC LIMIT 10; pickup_date | pickup_ntaname | number_of_trips -------------+--------------------------------+----------------- 2015-07-01 | Williamsburg | 1 2015-07-01 | park-cemetery-etc-Queens | 6 2015-07-01 | Maspeth | 1 2015-07-01 | Stuyvesant Town-Cooper Village | 44 2015-07-01 | Rego Park | 1 2015-07-01 | Greenpoint | 7 2015-07-01 | Highbridge | 1 2015-07-01 | Briarwood-Jamaica Hills | 3 2015-07-01 | Airport | 550 2015-07-01 | East Harlem North | 32 (10 rows) Time: 30.978 ms -
Calculate the length of each trip in minutes, then group the results by trip length:
taxi=# SELECT avg(tip_amount) AS avg_tip, avg(fare_amount) AS avg_fare, avg(passenger_count) AS avg_passenger, count(*) AS count, round((date_part('epoch', dropoff_datetime) - date_part('epoch', pickup_datetime)) / 60) as trip_minutes FROM taxi.trips WHERE round((date_part('epoch', dropoff_datetime) - date_part('epoch', pickup_datetime)) / 60) > 0 GROUP BY trip_minutes ORDER BY trip_minutes DESC LIMIT 5; avg_tip | avg_fare | avg_passenger | count | trip_minutes -------------------+------------------+------------------+-------+-------------- 1.96 | 8 | 1 | 1 | 27512 0 | 12 | 2 | 1 | 27500 0.562727272727273 | 17.4545454545455 | 2.45454545454545 | 11 | 1440 0.716564885496183 | 14.2786259541985 | 1.94656488549618 | 131 | 1439 1.00945205479452 | 12.8787671232877 | 1.98630136986301 | 146 | 1438 (5 rows) Time: 45.477 ms -
Show the number of pickups in each neighborhood broken down by hour of the day:
taxi=# SELECT pickup_ntaname, date_part('hour', pickup_datetime) as pickup_hour, SUM(1) AS pickups FROM taxi.trips WHERE pickup_ntaname != '' GROUP BY pickup_ntaname, pickup_hour ORDER BY pickup_ntaname, date_part('hour', pickup_datetime) LIMIT 5; pickup_ntaname | pickup_hour | pickups ----------------+-------------+--------- Airport | 0 | 3509 Airport | 1 | 1184 Airport | 2 | 401 Airport | 3 | 152 Airport | 4 | 213 (5 rows) Time: 36.895 ms -
Retrieve rides to LaGuardia or JFK airports:
taxi=# SELECT pickup_datetime, dropoff_datetime, total_amount, pickup_nyct2010_gid, dropoff_nyct2010_gid, CASE WHEN dropoff_nyct2010_gid = 138 THEN 'LGA' WHEN dropoff_nyct2010_gid = 132 THEN 'JFK' END AS airport_code, EXTRACT(YEAR FROM pickup_datetime) AS year, EXTRACT(DAY FROM pickup_datetime) AS day, EXTRACT(HOUR FROM pickup_datetime) AS hour FROM taxi.trips WHERE dropoff_nyct2010_gid IN (132, 138) ORDER BY pickup_datetime LIMIT 5; pickup_datetime | dropoff_datetime | total_amount | pickup_nyct2010_gid | dropoff_nyct2010_gid | airport_code | year | day | hour ---------------------+---------------------+--------------+---------------------+----------------------+--------------+------+-----+------ 2015-07-01 00:04:14 | 2015-07-01 00:15:29 | 13.30 | -34 | 132 | JFK | 2015 | 1 | 0 2015-07-01 00:09:42 | 2015-07-01 00:12:55 | 6.80 | 50 | 138 | LGA | 2015 | 1 | 0 2015-07-01 00:23:04 | 2015-07-01 00:24:39 | 4.80 | -125 | 132 | JFK | 2015 | 1 | 0 2015-07-01 00:27:51 | 2015-07-01 00:39:02 | 14.72 | -101 | 138 | LGA | 2015 | 1 | 0 2015-07-01 00:32:03 | 2015-07-01 00:55:39 | 39.34 | 48 | 138 | LGA | 2015 | 1 | 0 (5 rows) Time: 17.450 ms
Create a Dictionary
Create a dictionary associated with a table in your ClickHouse service. The table and dictionary are based on a CSV file that contains a row for each neighborhood in New York City.
The neighborhoods are mapped to the names of the five New York City boroughs (Bronx, Brooklyn, Manhattan, Queens and Staten Island), as well as Newark Airport (EWR).
Here’s an excerpt from the CSV file you’re using in table format. The
LocationID column in the file maps to the pickup_nyct2010_gid and
dropoff_nyct2010_gid columns in your trips table:
| LocationID | Borough | Zone | service_zone | | ———: | ———–– | ———————– | ———— | | 1 | EWR | Newark Airport | EWR | | 2 | Queens | Jamaica Bay | Boro Zone | | 3 | Bronx | Allerton/Pelham Gardens | Boro Zone | | 4 | Manhattan | Alphabet City | Yellow Zone | | 5 | Staten Island | Arden Heights | Boro Zone |
-
Still in Postgres, use the
clickhouse_raw_queryfunction to create a ClickHouse dictionary namedtaxi_zone_dictionaryand populate the dictionary from the CSV file in S3:SELECT clickhouse_raw_query($$ CREATE DICTIONARY taxi.taxi_zone_dictionary ( LocationID Int64 DEFAULT 0, Borough String, zone String, service_zone String ) PRIMARY KEY LocationID SOURCE(HTTP(URL 'https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/taxi_zone_lookup.csv' FORMAT 'CSVWithNames')) LIFETIME(MIN 0 MAX 0) LAYOUT(HASHED_ARRAY()) $$, 'host=localhost dbname=taxi');[!NOTE] Setting
LIFETIMEto 0 disables automatic updates to avoid unnecessary traffic to our S3 bucket. In other cases, you might configure it differently. For details, see Refreshing dictionary data using LIFETIME.- Now import it:
IMPORT FOREIGN SCHEMA taxi LIMIT TO (taxi_zone_dictionary) FROM SERVER taxi_srv INTO taxi;- Confirm we can query it:
taxi=# SELECT * FROM taxi.taxi_zone_dictionary limit 3; LocationID | Borough | Zone | service_zone ------------+-----------+-----------------------------------------------+-------------- 77 | Brooklyn | East New York/Pennsylvania Avenue | Boro Zone 106 | Brooklyn | Gowanus | Boro Zone 103 | Manhattan | Governor's Island/Ellis Island/Liberty Island | Yellow Zone (3 rows)- Excellent. Now use the
dictGetfunction unction to retrieve a borough’s name in a query. For this query sums up the number of taxi rides per borough that end at either the LaGuardia or JFK airport:
taxi=# SELECT count(1) AS total, COALESCE(NULLIF(dictGet( 'taxi.taxi_zone_dictionary', 'Borough', toUInt64(pickup_nyct2010_gid) ), ''), 'Unknown') AS borough_name FROM taxi.trips WHERE dropoff_nyct2010_gid = 132 OR dropoff_nyct2010_gid = 138 GROUP BY borough_name ORDER BY total DESC; total | borough_name -------+--------------- 23683 | Unknown 7053 | Manhattan 6828 | Brooklyn 4458 | Queens 2670 | Bronx 554 | Staten Island 53 | EWR (7 rows) Time: 66.245 msThis query sums up the number of taxi rides per borough that end at either the LaGuardia or JFK airport. Notice there are quite a few trips where the pickup neighborhood is unknown.
Perform a join
Write some queries that join the taxi_zone_dictionary with your trips
table.
-
Start with a simple
JOINthat acts similarly to the previous airport query above:taxi=# SELECT count(1) AS total, "Borough" FROM taxi.trips JOIN taxi.taxi_zone_dictionary ON trips.pickup_nyct2010_gid = toUInt64(taxi.taxi_zone_dictionary."LocationID") WHERE pickup_nyct2010_gid > 0 AND dropoff_nyct2010_gid IN (132, 138) GROUP BY "Borough" ORDER BY total DESC; total | borough_name -------+--------------- 7053 | Manhattan 6828 | Brooklyn 4458 | Queens 2670 | Bronx 554 | Staten Island 53 | EWR (6 rows) Time: 48.449 ms[!NOTE] Notice the output of the above
JOINquery is the same as thedictGetquery above, (except that theUnknownvalues are not included). Behind the scenes, ClickHouse is actually calling thedictGetfunction for thetaxi_zone_dictionarydictionary, but theJOINsyntax is more familiar for SQL developers.taxi=# explain SELECT count(1) AS total, "Borough" FROM taxi.trips JOIN taxi.taxi_zone_dictionary ON trips.pickup_nyct2010_gid = toUInt64(taxi.taxi_zone_dictionary."LocationID") WHERE pickup_nyct2010_gid > 0 AND dropoff_nyct2010_gid IN (132, 138) GROUP BY "Borough" ORDER BY total DESC; QUERY PLAN ----------------------------------------------------------------------- Foreign Scan (cost=1.00..5.10 rows=1000 width=40) Relations: Aggregate on ((trips) INNER JOIN (taxi_zone_dictionary)) (2 rows) Time: 2.012 ms -
This query returns rows for the the 1000 trips with the highest tip amount, then performs an inner join of each row with the dictionary:
taxi=# SELECT * FROM taxi.trips JOIN taxi.taxi_zone_dictionary ON trips.dropoff_nyct2010_gid = taxi.taxi_zone_dictionary."LocationID" WHERE tip_amount > 0 ORDER BY tip_amount DESC LIMIT 1000;
[!NOTE] Generally, we avoid using
SELECT *in PostgreSQL and ClickHouse. You should only retrieve the columns you actually need.