I am using node v16.14.0.
The below is a node script that is trying to insert message data into Postgres db after pulling a message from a platform event, but some how it is not able to.
It hits the line "let client = await pool.connect();" and continues with the next message. It never inserts into the table.
I am not sure what I am doing wrong here?
const jsforce = require("jsforce");
const { Pool }= require("pg");
const channel = "/event/Short_Url_Click_Notification__e";
let replayId = -2 // -1 = Only New messages | -2 = All Window and New
console.log('db = ' +process.env.DATABASE_URL);
console.log('sfu= ' +process.env.SF_USERNAME);
console.log('sfp= ' +process.env.SF_PASSWORD);
// sf connection
const sfConnection = new jsforce.Connection();
const pool = new Pool({
    connectionString:  process.env.DATABASE_URL,
    ssl:{
        rejectUnauthorized: false
    }
});
try {
    pool.query("SELECT replay_id, date_str, channel FROM platform_event_setting WHERE channel = $1 LIMIT 1", [channel], (err, res) => {
        if (err) {
            console.error("Unable to Query platform_event_setting: ", err);
            pgClient.end();
            process.exit(1);
        }
        if (res.rowCount > 0) {
            replayId = res.rows[0].replay_id;
        }
        sfConnection.login(process.env.SF_USERNAME, process.env.SF_PASSWORD,  (err, res) => {
            if (err) {
                console.error(err);
                return process.exit(1);
            }
            const streamingClient = sfConnection.streaming.createClient([
                new jsforce.StreamingExtension.Replay(channel, replayId),
                new jsforce.StreamingExtension.AuthFailure(function () {
                    return process.exit(1);
                }),
            ]);
            subscription = streamingClient.subscribe(channel, async (message) => {
                try {
                    console.log("received data", JSON.stringify(message));
                    event_values = [channel, message.event.replayId, message.payload.CreatedDate];
                    let client = await pool.connect();
                    client.query("BEGIN");
                    try {
                        // process message
                        await client.query(`INSERT INTO platform_event_setting
                                            (channel, replay_id, date_str)
                                        VALUES ($1, $2, $3)
                                        ON CONFLICT ON CONSTRAINT platform_event_setting_pkey
                                        DO UPDATE SET (replay_id, date_str) = (EXCLUDED.replay_id, EXCLUDED.date_str)
                                        `, event_values);
                        client.query("COMMIT");
                    } catch (e) {
                        console.error(e);
                        client.query("ROLLBACK");
                        return process.exit(1);
                    }
                } finally {
                    client.release();
                }
            }); // subscritpion
        }); // sf connection
    }); // pool
} catch (e) {
    console.error(e);
}
The above script pulls a message from Salesforce platform events bus and trying to insert into a Postgres database.