Some days ago a coworker of mine linked me to the source code of Google’s exposure notifications API and of course is a good chance to see how Google is doing Go related codebases.

Continuing with the topic of Postgres and benchmarking, I decided to check out how Postgres configuration was done and this line got my attention:

import (
	...
	"github.com/jackc/pgx/v4/pgxpool"
)

What is pgx, and why should we care?

Pgx is a library that implements postgres connection without relying on the database/sql standard package. The source code can be found here; and as the readme states:

pgx aims to be low-level, fast, and performant, while also enabling PostgreSQL-specific features that the standard database/sql package does not allow for.

pgx supports many features beyond what is available through database/sql:

  • […]
  • Automatic statement preparation and caching
  • Batch queries
  • Single-round trip query mode
  • Connection pool with after-connect hook for arbitrary connection setup

Automatic statement preparation and caching - pgx will prepare and cache statements by default. This can provide an significant free improvement to code that does not explicitly use prepared statements. Under certain workloads, it can perform nearly 3x the number of queries per second.

Batched queries - Multiple queries can be batched together to minimize network round trips.

And these are just a bunch of things picked from the readme, but there are some strong statements which are analyzed below to see if they are true or not.

Benchmarking

On my previous post Benchmarking concurrently safe upsert queries I showed an application using lib/pq and benchmark for different queries, all trying to solve the same scenario. In order not to make the post larger than necessary, please check the details on the link above.

In this post, the application shown before will be used and we will cover:

  • How to migrate from lib/pq to pgx.
  • Compare the benchmark between both.

Without further ado, let’s begin.

Connection handling

On the lib/pq version, database/sql is being used to create a struct holding a *sql.Db variable which was being used for every call to the database.

type pgCustomerRepo struct {
	dbHandler *sql.DB
}

func NewPgCustomerRepo(connString string) (*pgCustomerRepo, error) {
	db, err := sql.Open("postgres", connString)
	if err != nil {
		return nil, err
	}
	return &pgCustomerRepo{
		dbHandler: db,
	}, nil
}

Using pgx this is changed to:

type pgCustomerRepo struct {
	dbHandler *pgxpool.Pool
}

func NewPgCustomerRepo(connString string) (*pgCustomerRepo, error) {
	config, err := pgxpool.ParseConfig(connString)
	if err != nil {
		return nil, err
	}
	config.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error {
		conn.ConnInfo().RegisterDataType(pgtype.DataType{
			Value: &MyTid{&pgtype.TID{}},
			Name:  "tid",
			OID:   pgtype.TIDOID,
		})
		return nil
	}

	db, err := pgxpool.ConnectConfig(context.Background(), config)
	if err != nil {
		return nil, err
	}

	return &pgCustomerRepo{
		dbHandler: db,
	}, nil
}

Since the idea is to try out the pooling mechanism, a pool configuration is created and used for the connection. This is fairly simple, but there is more than just that in the code.

The function AfterConnect allows the developer to hook custom behaviors into the database connection. As for this example a custom mapping is added to map a TID field to a string. TID is the Postgres type we get when we query SELECT ctid.

It is understandable that a real world application will not fetch this type of data thus the mapping was not implemented. For this I’ve submitted a PR but it did not reach a release tag at the moment I am writing this; so I added the custom hook in the code.

Again, in a real world application it is likely that an AfterConnect hook is not needed, but it is valuable knowing that you can use it if you need it.

With the connection covered, let’s move to the queries:

Advisory Lock

The changes to the code are barely there. All of the pgx functions that go against the database expect a context and there is none non-context alternative.

tx, err := repo.dbHandler.BeginTx(ctx, nil)
-->
tx, err := repo.dbHandler.BeginTx(ctx, pgx.TxOptions{})


_, err = tx.ExecContext(ctx, "SELECT pg_advisory_xact_lock($1)", hash(id.String()))
-->
_, err = tx.Exec(ctx, "SELECT pg_advisory_xact_lock($1)", hash(id.String()))


r := tx.QueryRowContext(ctx, "SELECT ctid, xmax, id FROM customers WHERE customer_id = $1", id)
err = r.Scan(&res.CTID, &res.XMAX, &res.ID)
-->
err = tx.QueryRow(ctx, "SELECT ctid, xmax, id FROM customers WHERE customer_id = $1", id).Scan(&res.CTID, &res.XMAX, &res.ID)


q := "INSERT INTO customers(customer_id) VALUES($1) RETURNING ctid, xmax, id"
row := tx.QueryRowContext(ctx, q, id)
err = row.Scan(&res.CTID, &res.XMAX, &res.ID)
-->
q := "INSERT INTO customers(customer_id) VALUES($1) RETURNING ctid, xmax, id"
err = tx.QueryRow(ctx, q, id).Scan(&res.CTID, &res.XMAX, &res.ID)


err = tx.Commit()
-->
err = tx.Commit(ctx)


err = tx.Rollback()
-->
err = tx.Rollback(ctx)

As for the performance:

LIB/PQ: EXECUTING FOR LOCKS
Bucket         #     %       Histogram
[0s,    2ms]   974   24.35%  ##################
[2ms,   4ms]   2771  69.27%  ###################################################
[4ms,   6ms]   244   6.10%   ####
[6ms,   8ms]   10    0.25%
[8ms,   10ms]  0     0.00%
[10ms,  15ms]  1     0.03%
[15ms,  +Inf]  0     0.00%

PGX: EXECUTING FOR LOCKS
Bucket         #     %       Histogram
[0s,    2ms]   2442  61.05%  #############################################
[2ms,   4ms]   1506  37.65%  ############################
[4ms,   6ms]   38    0.95%
[6ms,   8ms]   11    0.27%
[8ms,   10ms]  2     0.05%
[10ms,  15ms]  1     0.03%
[15ms,  +Inf]  0     0.00%

And this is awesome! Out of the box without changing too much the code, the result is already speaking for itself.

Upsert on conflict

query :=
	"INSERT INTO customers(customer_id) VALUES($1) " +
	"ON CONFLICT (customer_id) DO UPDATE SET customer_id = excluded.customer_id RETURNING ctid, xmax, id"

row := repo.dbHandler.QueryRowContext(ctx, query, id)
err = row.Scan(&res.CTID, &res.XMAX, &res.ID)
-->
err = repo.dbHandler.QueryRow(ctx, query, id).Scan(&res.CTID, &res.XMAX, &res.ID)

Even simpler than the previous one, now there is a one liner.

Benchmark:

LIB/PQ: EXECUTING FOR UPSERT CONFLICT
Bucket         #     %       Histogram
[0s,    2ms]   1777  44.42%  #################################
[2ms,   4ms]   1938  48.45%  ####################################
[4ms,   6ms]   158   3.95%   ##
[6ms,   8ms]   72    1.80%   #
[8ms,   10ms]  33    0.83%
[10ms,  15ms]  18    0.45%
[15ms,  +Inf]  4     0.10%

PGX: EXECUTING FOR UPSERT CONFLICT
Bucket         #     %       Histogram
[0s,    2ms]   2676  66.90%  ##################################################
[2ms,   4ms]   1295  32.38%  ########################
[4ms,   6ms]   21    0.53%
[6ms,   8ms]   6     0.15%
[8ms,   10ms]  1     0.03%
[10ms,  15ms]  1     0.03%
[15ms,  +Inf]  0     0.00%

Yes, I am running the benchmark on the same computer, doing a warm up phase in which the results are discarded to be fair with the old benchmarks as well.

Upsert do nothing

As you can recall (if you read my previous post), there is a limitation when sending on the same QueryContext more than one parameterized query.

On the previous post, this was bypassed by a non good practice that was not sending parameterized queries but injecting the string; which as I stated previously it is a bad practice that can led to security vulnerabilities.

But on pgx there is a built-in support for sending a query batch in a single trip. And this definitely draw my attention since I find this approach the easiest to understand a.k.a write first, read later.

query :=
	"INSERT INTO customers(customer_id)	VALUES($1) ON CONFLICT DO NOTHING;" +
	"SELECT ctid, xmax, id FROM customers WHERE customer_id = $1"

query = strings.ReplaceAll(query, "$1", fmt.Sprintf("'%s'", id.String()))
row := repo.dbHandler.QueryRowContext(ctx, query)
err = row.Scan(&res.CTID, &res.XMAX, &res.ID)

This unsafe piece of code now can be converted to an elegant and performant way:

batch := &pgx.Batch{}
batch.Queue("INSERT INTO customers(customer_id) VALUES($1) ON CONFLICT DO NOTHING", id)
batch.Queue("SELECT ctid, xmax, id FROM customers WHERE customer_id = $1", id)
results := repo.dbHandler.SendBatch(ctx, batch)

_, err = results.Exec()
if err != nil {
	return
}

err = results.QueryRow().Scan(&res.CTID, &res.XMAX, &res.ID)
if err != nil {
	return
}

err = results.Close()

But does this truly work?

LIB/PQ: EXECUTING FOR UPSERT DO NOTHING
Bucket         #     %       Histogram
[0s,    2ms]   3426  85.65%  ################################################################
[2ms,   4ms]   572   14.30%  ##########
[4ms,   6ms]   1     0.03%
[6ms,   8ms]   0     0.00%
[8ms,   10ms]  1     0.03%
[10ms,  15ms]  0     0.00%
[15ms,  +Inf]  0     0.00%

PGX: EXECUTING FOR UPSERT DO NOTHING
Bucket         #     %       Histogram
[0s,    2ms]   3878  96.95%  ########################################################################
[2ms,   4ms]   118   2.95%   ##
[4ms,   6ms]   4     0.10%
[6ms,   8ms]   0     0.00%
[8ms,   10ms]  0     0.00%
[10ms,  15ms]  0     0.00%
[15ms,  +Inf]  0     0.00%

And the answer is yes, it does work. Not only it is faster, but it managed to get rid of the vulnerability issue and allows the code to send two parameterized queries in one round trip.

Now if someone asks me if I choose doing this over CTE queries for this scenario, I will lean more on this approach since it is easier to comprehend.

CTE queries

query :=
	"WITH " +
		"search AS (SELECT ctid, xmax, id FROM customers WHERE customer_id = $1 LIMIT 1)," +
		"add AS (INSERT INTO customers (customer_id) SELECT $1 WHERE NOT EXISTS(SELECT id from search) RETURNING ctid, xmax, id)" +
	"SELECT ctid, xmax, id from add	UNION ALL SELECT ctid, xmax, id from search"

row := repo.dbHandler.QueryRowContext(ctx, query, id)
err = row.Scan(&res.CTID, &res.XMAX, &res.ID)
-->
err = repo.dbHandler.QueryRow(ctx, query, id).Scan(&res.CTID, &res.XMAX, &res.ID)

As with the other approaches, not much to change here.

Benchmark:

LIB/PQ: EXECUTING FOR UPSERT CTE
Bucket         #     %       Histogram
[0s,    2ms]   2950  73.75%  #######################################################
[2ms,   4ms]   1034  25.85%  ###################
[4ms,   6ms]   14    0.35%
[6ms,   8ms]   2     0.05%
[8ms,   10ms]  0     0.00%
[10ms,  15ms]  0     0.00%
[15ms,  +Inf]  0     0.00%

PGX: EXECUTING FOR UPSERT CTE
Bucket         #     %       Histogram
[0s,    2ms]   3893  97.32%  ########################################################################
[2ms,   4ms]   106   2.65%   #
[4ms,   6ms]   1     0.03%
[6ms,   8ms]   0     0.00%
[8ms,   10ms]  0     0.00%
[10ms,  15ms]  0     0.00%
[15ms,  +Inf]  0     0.00%

And again, the results are impressive, but one can say now:

In all of these approaches shown, the use case was to bombard the database in order to insert and fetch a customer with the same ID. Maybe the cache stops working when the parameter changes.

Let’s try it out:

routes.HandleFunc("/upsert-cte-random", s.UpsertCustomerRandom(s.CustomerRepo.UpsertCustomerCte))
func (s *Server) UpsertCustomerRandom(repoUpsert func(ctx context.Context, id uuid.UUID) (res domain.UpsertedRow, err error)) http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {
		...

		// Call the repo generating a random UUID for each time
		start := time.Now()
		upserted, err := repoUpsert(r.Context(), uuid.New())
		if err != nil {
			w.WriteHeader(http.StatusInternalServerError)
			fmt.Println(err)
			return
		}

		...
	}
}

Of course, in order to do a comparison, this new route was benchmarked using both lib/pq and pgx:

LIB/PQ: EXECUTING FOR UPSERT CTE RANDOM
Bucket         #     %       Histogram
[0s,    2ms]   1744  43.60%  ################################
[2ms,   4ms]   2208  55.20%  #########################################
[4ms,   6ms]   45    1.12%
[6ms,   8ms]   2     0.05%
[8ms,   10ms]  0     0.00%
[10ms,  15ms]  0     0.00%
[15ms,  +Inf]  1     0.03%

PGX: EXECUTING FOR UPSERT CTE RANDOM
Bucket         #     %       Histogram
[0s,    2ms]   2643  66.07%  #################################################
[2ms,   4ms]   1356  33.90%  #########################
[4ms,   6ms]   1     0.03%
[6ms,   8ms]   0     0.00%
[8ms,   10ms]  0     0.00%
[10ms,  15ms]  0     0.00%
[15ms,  +Inf]  0     0.00%

And as we can see, the cache is still doing its job.

Summary

I discovered this library due to some luck but so far I have not been disappointed. I am not seeing a reason on not picking this library as default for standard CRUD applications. Of course, there might be use cases where it is not possible and then lib/pq can still do the job.

Some additional notes

  • I am not being sponsored nor paid by the author of the library.
  • The code used for benchmarking can be found here. The master branch contains the lib/pq code and the branch refactor/pgx contains the pgx code.
  • All the benchmarks can be executed easily by:
make run-db (create database and table after executing this)
make run-app
make run-bench

Last, but not least, if you consider this is useful, do not hesitate on sharing it on your favorite social media; and if there is a particular topic you want me to cover, just put it on the comments below.