home

Leader elections with Postgres advisory locks

Modern container orchestration solutions like K8s make it very easy to run multiple instances of an application. If you're running stateless API servers that simply respond to HTTP requests, you can scale out to your heart's content. But certain applications cannot simply be scaled out. For example, an application that runs scheduled cron jobs. Ideally, you would want one instance to become the leader and run the cron job or distribute the tasks among all instances.

Electing leaders by locking resources

The easiest way to elect a leader among multiple instances is to make all of them share a resource (a disk or a database). Every instance attempts to obtain a lock on the resource. This could mean writing a file to disk or inserting a record in the database. The instance that succeeds becomes the leader.

More people use Redis for obtaining locks and synchronizing operations than caching.

Electing leaders with advisory locks

Running leader elections is very simple if your application is already using Postgres. The advisory lock feature of Postgres is very well-suited for this.

package main

import (
    "context"
    "log"
    "time"

    "github.com/jackc/pgx/v4/pgxpool"
)

var pool pgxpool.Pool

func leaderTasks(ctx context.Context) {
}

func runElectionLoop() {
    for {
        leaderStatusC := make(chan bool)
        go runElection(leaderStatusC)

        // if first message on channel is true then current instance is leader
        if isLeader := <-leaderStatusC; isLeader {
            // start interval based tasks when pod becomes leader
            ctx, cancel := context.WithCancel(context.TODO())
            leaderTasks(ctx)

            // second message on channel will be will be sent when current pod is no longer the leader
            <-leaderStatusC

            // cancel all interval based jobs when no longer leader
            cancel()
        }

        // wait for 1 minute before retrying election
        <-time.After(1 * time.Minute)
    }
}

func runElection(leaderStatusC chan<- bool) {
    ctx := context.TODO()

    defer func() {
        leaderStatusC <- false
    }()

    conn, err := pool.Acquire(ctx)
    if err != nil {
        log.Default().Printf("failed to acquire connection: %s", err.Error())
        return
    }
    defer conn.Release()

    acquireLockQ := "SELECT pg_try_advisory_lock(10)"

    var becameLeader bool
    if err := conn.QueryRow(ctx, acquireLockQ).Scan(&becameLeader); err != nil {
        log.Default().Printf("leader election failed with error: %s", err.Error())
        return
    }
    if !becameLeader {
        log.Default().Printf("leader election failed")
        return
    }

    log.Default().Printf("new leader")
    leaderStatusC <- true

    for {
        <-time.After(1 * time.Minute)

        ctx2, _ := context.WithTimeout(context.Background(), 5*time.Second)

        // ensure that an advisory lock is held on the ID 10 by the same connection as the one running this query
        checkLockQ := "SELECT count(*) FROM pg_locks WHERE pid = pg_backend_pid() AND locktype = 'advisory' AND objid = 10"

        var lockCount int
        lockCountErr := conn.QueryRow(ctx2, checkLockQ).Scan(&lockCount)
        if lockCount == 0 {
            log.Default().Printf("no longer leader: %s", lockCountErr.Error())
            break
        }
    }
}
package main

import (
    "context"
    "log"
    "time"

    "github.com/jackc/pgx/v4/pgxpool"
)

var pool pgxpool.Pool

func leaderTasks(ctx context.Context) {
}

func runElectionLoop() {
    for {
        leaderStatusC := make(chan bool)
        go runElection(leaderStatusC)

        // if first message on channel is true then current instance is leader
        if isLeader := <-leaderStatusC; isLeader {
            // start interval based tasks when pod becomes leader
            ctx, cancel := context.WithCancel(context.TODO())
            leaderTasks(ctx)

            // second message on channel will be will be sent when current pod is no longer the leader
            <-leaderStatusC

            // cancel all interval based jobs when no longer leader
            cancel()
        }

        // wait for 1 minute before retrying election
        <-time.After(1 * time.Minute)
    }
}

func runElection(leaderStatusC chan<- bool) {
    ctx := context.TODO()

    defer func() {
        leaderStatusC <- false
    }()

    conn, err := pool.Acquire(ctx)
    if err != nil {
        log.Default().Printf("failed to acquire connection: %s", err.Error())
        return
    }
    defer conn.Release()

    acquireLockQ := "SELECT pg_try_advisory_lock(10)"

    var becameLeader bool
    if err := conn.QueryRow(ctx, acquireLockQ).Scan(&becameLeader); err != nil {
        log.Default().Printf("leader election failed with error: %s", err.Error())
        return
    }
    if !becameLeader {
        log.Default().Printf("leader election failed")
        return
    }

    log.Default().Printf("new leader")
    leaderStatusC <- true

    for {
        <-time.After(1 * time.Minute)

        ctx2, _ := context.WithTimeout(context.Background(), 5*time.Second)

        // ensure that an advisory lock is held on the ID 10 by the same connection as the one running this query
        checkLockQ := "SELECT count(*) FROM pg_locks WHERE pid = pg_backend_pid() AND locktype = 'advisory' AND objid = 10"

        var lockCount int
        lockCountErr := conn.QueryRow(ctx2, checkLockQ).Scan(&lockCount)
        if lockCount == 0 {
            log.Default().Printf("no longer leader: %s", lockCountErr.Error())
            break
        }
    }
}

Postgres will handle the lock expiration process for us. The lock will automatically be released if the connection holding the lock is closed, allowing another instance to become the new leader if the leader instance crashes or loses network connectivity.
I have used the pgx library for the database connection pool in the example above. The code to connect with the database is not included in the sample. Every connection in a pool is treated by Postgres as a separate connection and will have a different backend PID. You must ensure that the same connection is used for checking the leader status every time.

Further reading


Hopefully, I've shown you how easy it is to run leader elections with Postgres. Please reach out to me on Twitter for comments and questions.