In my previous post I covered how to build concurrently safe upsert queries.

This post serves as a continuation where different approaches will be battle tested to expose the performance of each one:

  • Advisory lock
  • On conflict update
  • On conflict do nothing
  • CTE queries

Without further ado, let’s start:

The application

In order to emulate the scenario described on the previous post, an HTTP API is required. This one will connect to a database to perform the calls. The application is written in Go and can be found here.

The application is plain simple, as it contains few files:

  • cmd/main.go: Responsible for bootstrapping the postgres connection and http server.
  • server/server.go: Exposes different endpoints, performs body parsing and error checking.
  • domain/domain.go: Exposes the UpsertedRow struct that will be returned by postgres.
  • postgres/postgres.go: Contains functions for each approach listed above, all with the same signature so they can be consumed easily on the server file. Yes, on an a real world app, do not inject database handlers directly on the transport layer.

Pre-requisites

Since we are going to use postgres, an instance needs to be set up, and for convenience this example uses docker:

docker run --name db -p 5432:5432 -e POSTGRES_HOST_AUTH_METHOD=trust postgres:latest

A file named seed.psql is also present on the root of the repository:

CREATE DATABASE customer_svc;
CREATE TABLE customers(id SERIAL, customer_id UUID NOT NULL PRIMARY KEY);

Running the application

The application can be run in a container as well:

docker build -t pg-upsert-app:local .
docker run -p 8080:8080 --link db --name app -i pg-upsert-app:local

Benchmarking

For the benchmark, the tool vegeta is used. This tool allows doing HTTP load testing which will be used against the API.

The test suite is just a bash script that will feature a similar logic for each approach to be tested; example:

UPSERT_LOCK=$(echo '{
  method: "POST",
  url: "http://app:8080/upsert-lock",
  body:"{\"id\": \"<ruuid>\"}" | @base64,
  header: {"Content-Type": ["application/json"]}
}' | sed "s/<ruuid>/$(uuidgen | tr '[:upper:]' '[:lower:]')/g")
echo "EXECUTING FOR LOCKS"
jq -ncM "$UPSERT_LOCK" \
  | vegeta attack -format=json -duration=40s -connections=20 -rate=100 | vegeta encode \
  | vegeta report -type="hist[0,2ms,4ms,6ms,8ms,10ms,15ms]"

This is translated to the following:

  • Echo a POST request towards http://app:8080/upsert-lock with the payload {"id": "some-uuid-generated-at-runtime"}. This uuid will be repeated for each call performed in the given scenario.
  • Parse this echoed string as a json, and use it as an input for vegeta tool.
  • Execute requests for 40 seconds with 20 concurrent connections at a rate of 100 request per second; in other words 4000 request distributed across 20 connections for 40 seconds.
  • The results will be aggregated into different buckets determined by hist[0,2ms,4ms,6ms,8ms,10ms,15ms].

The execution of this test is also dockerized so there is no need to download additional tools:

docker build -t benchmark:local -f benchmark/Dockerfile benchmark/.
docker run --link app -t benchmark:local

In order to avoid results influenced by the warm up phase of the app, the test suite was executed twice and only the results from the last execution are shown below.

Advisory lock

As the previous post indicates this approach is the less scalable. The introduction of a custom application lock in the database is not a good design model and the results are showing that:

EXECUTING FOR LOCKS
Bucket         #     %       Histogram
[0s,    2ms]   974   24.35%  ##################
[2ms,   4ms]   2771  69.27%  ###################################################
[4ms,   6ms]   244   6.10%   ####
[6ms,   8ms]   10    0.25%
[8ms,   10ms]  0     0.00%
[10ms,  15ms]  1     0.03%
[15ms,  +Inf]  0     0.00%

Not only that but the code is also more complex than other solutions:

func (repo *pgCustomerRepo) UpsertCustomerLock(ctx context.Context, id uuid.UUID) (res domain.UpsertedRow, err error) {
	tx, err := repo.dbHandler.BeginTx(ctx, nil)
	if err != nil {
		return
	}

	_, err = tx.ExecContext(ctx, "SELECT pg_advisory_xact_lock($1)", hash(id.String()))
	if err != nil {
		return
	}

	r := tx.QueryRowContext(ctx, "SELECT ctid, xmax, id FROM customers WHERE customer_id = $1", id)
	err = r.Scan(&res.CTID, &res.XMAX, &res.ID)

	if err != nil && err == sql.ErrNoRows {
		q := "INSERT INTO customers(customer_id) VALUES($1) RETURNING ctid, xmax, id"
		row := tx.QueryRowContext(ctx, q, id)
		err = row.Scan(&res.CTID, &res.XMAX, &res.ID)
	}

	if err == nil {
		err = tx.Commit()
	} else {
		err = tx.Rollback()
	}

	return
}

func hash(s string) int64 {
	h := fnv.New64a()
	h.Write([]byte(s))
	return int64(h.Sum64())
}
  1. Create the transaction.
  2. Get a deterministic int64 for the given uuid, and perform the lock (subsequent calls will have to wait here until the transaction ends)
  3. Get the customer.
  4. If customer does not exist, create it and return it back.
  5. Commit the transaction.

Upsert on conflict

This one is most likely the most common solution for these kind of cases. The results are better than the previous one but it is still not a good design.

Remember, updating on conflict will cause the following side effects:

  • Old row is removed, a new is added.
  • It modifies the row transaction id, an “old” that got updated row will have a new transaction timestamp.
  • Firing triggers affecting the table.

For a better refresher, click here

EXECUTING FOR UPSERT CONFLICT
Bucket         #     %       Histogram
[0s,    2ms]   1777  44.42%  #################################
[2ms,   4ms]   1938  48.45%  ####################################
[4ms,   6ms]   158   3.95%   ##
[6ms,   8ms]   72    1.80%   #
[8ms,   10ms]  33    0.83%
[10ms,  15ms]  18    0.45%
[15ms,  +Inf]  4     0.10%

The code here is simpler, so this one feels easier:

func (repo *pgCustomerRepo) UpsertCustomerConflict(ctx context.Context, id uuid.UUID) (res domain.UpsertedRow, err error) {
	query :=
		"INSERT INTO customers(customer_id) VALUES($1) " +
		"ON CONFLICT (customer_id) DO UPDATE SET customer_id = excluded.customer_id " +
		"RETURNING ctid, xmax, id"

	row := repo.dbHandler.QueryRowContext(ctx, query, id)
	err = row.Scan(&res.CTID, &res.XMAX, &res.ID)
	return
}
  1. Insert the customer.
  2. If customer exists, update (delete + create) the row overriding the id with the same id.
  3. Get back the data.

Upsert do nothing

The following approach has its pros and cons. The pros are that there are no side effects being caused in the database, and the application is not doing any kind of locking. This approach can be referred to write first, read later.

There are two ways to achieve this technique:

  • Same batch, write then read, one roundtrip.
  • Two batches, first one: write, second one: read, two round trips.

If two batches are going to be used, the performance will end up being somewhat like the locking scenario, since there is a need of going several times to the database.

The example displayed below shows how to achieve it in only one batch; but then again this is part of the cons. By nature Postgres does not allow sending two parameterized query statements on the same batch; so the parameters should be resolved before sending the query which can led to SQL injection.

The example below shows how to bypass Postgres limit avoiding parameters; but this is not a good practice, as it can introduce security vulnerabilities in case we are dealing with parameters like strings.

func (repo *pgCustomerRepo) UpsertCustomerDoNothing(ctx context.Context, id uuid.UUID) (res domain.UpsertedRow, err error) {
	query :=
		"INSERT INTO customers(customer_id)	VALUES($1) ON CONFLICT DO NOTHING;" +
		"SELECT ctid, xmax, id FROM customers WHERE customer_id = $1"

	query = strings.ReplaceAll(query, "$1", fmt.Sprintf("'%s'", id.String()))
	row := repo.dbHandler.QueryRowContext(ctx, query)
	err = row.Scan(&res.CTID, &res.XMAX, &res.ID)
	return
}
  1. Insert the customer, if the customer exists, do nothing.
  2. Fetch the customer.

The performance if this is accomplished in one batch is good, but don’t do it as security is more important than premature optimization:

EXECUTING FOR UPSERT DO NOTHING
Bucket         #     %       Histogram
[0s,    2ms]   3426  85.65%  ################################################################
[2ms,   4ms]   572   14.30%  ##########
[4ms,   6ms]   1     0.03%
[6ms,   8ms]   0     0.00%
[8ms,   10ms]  1     0.03%
[10ms,  15ms]  0     0.00%
[15ms,  +Inf]  0     0.00%

CTE queries

Last, this approach is not the most pleasant to read but it has some pros:

  • No side effects on the database.
  • No custom application locking.
  • Parameterized queries.
  • Works with database engines that do not support ON CONFLICT DO ...
EXECUTING FOR UPSERT CTE
Bucket         #     %       Histogram
[0s,    2ms]   2950  73.75%  #######################################################
[2ms,   4ms]   1034  25.85%  ###################
[4ms,   6ms]   14    0.35%
[6ms,   8ms]   2     0.05%
[8ms,   10ms]  0     0.00%
[10ms,  15ms]  0     0.00%
[15ms,  +Inf]  0     0.00%

The code is displayed below:

func (repo *pgCustomerRepo) UpsertCustomerCte(ctx context.Context, id uuid.UUID) (res domain.UpsertedRow, err error) {
	query :=
		"WITH " +
			"search AS (SELECT ctid, xmax, id FROM customers WHERE customer_id = $1 LIMIT 1)," +
			"add AS (INSERT INTO customers (customer_id) SELECT $1 WHERE NOT EXISTS(SELECT id from search) RETURNING ctid, xmax, id)" +
		"SELECT ctid, xmax, id from add	UNION ALL SELECT ctid, xmax, id from search"

	row := repo.dbHandler.QueryRowContext(ctx, query, id)
	err = row.Scan(&res.CTID, &res.XMAX, &res.ID)
	return
}
  1. Prepare the search statement.
  2. Prepare the add statement that will only add a customer if search does not return one.
  3. Execute both statements with a union, only one of the statements will return the customer.

Recap

In this post different approaches were battle tested; there is no holy grail or golden rule when it comes to database modelling.

The results showed here do not indicate that your project has to use CTE queries. Always the healthier approach is to ask yourself and the team:

  • Do we have this problem?
  • Why do we have this problem?
  • Is there anything we can do to get rid of this problem?
  • Brainstorm session, avoid premature optimization, avoid over engineering.
  • Take a decision.

Keeping this in mind, will probably avoid falling into design traps or solutions that were not needed in the first place.

As a last sentence, if you consider this is useful, do not hesitate on sharing this on your favorite social media. See you next time!