Category Archives: postgres

Using the “hint_plan” Table Provided by the PostgreSQL Extension “pg_hint_plan”

Introduction

For those who have worked with Oracle, the pg_hint_plan extension is one that will allow you to hint plans in patterns that you are likely very familiar with:

  • sql_patch
  • sql_profile
  • sql_plan_baselines

While currently, the functionality provided by pg_hint_plan is not nearly as robust (hints list), it does provide most of what you would encounter day to day as a DBA. That being said, one thing that is currently missing is the ability to easily add hints without changing code via stored_procedures / functions like in Oracle. The only way to currently do this in Open Source PostgreSQL is to manually manipulate a table named “hints” typically located in the “hint_plan” schema.

The “hints” table which is provided by the extension is highly dependent (just like Oracle) on a normalized SQL statement. A normalized SQL statement in PostgreSQL is one that has all carriage returns removed, all spaces converted to single spaces and all literals and parameters replaced with a “?”. Typically you have to do this manually, but in this blog post, I am going to show how I have leveraged entries in “pg_stat_statements” along with custom written functions to normalize the statement and place it into the “hints” table. To use this “hints” table feature, the following setting must be enabled at either the session or system level:

set session pg_hint_plan.enable_hint_table to on;
or

in the postgresql.conf:
pg_hint_plan.enable_hint_table to on;

What Does a Normalized Statement Look Like?

Typically, when you receive code from a developer or even code that you work on yourself, you format it in order to to make it human readable and easier to interpret. For example, you might want your statement to look like this (notice the parameters / literals in the statement:

SELECT
    b.bid,
    sum(abalance)
FROM
    pgbench_branches b
    JOIN pgbench_accounts a ON (b.bid = a.bid)
WHERE
    b.bid = 12345
    AND a.aid BETWEEN 100 AND 200
GROUP BY
    b.bid
ORDER BY
    1;

Now to normalize the statement for use with the “hints” table it needs to look like this:

select b.bid, sum(abalance) from pgbench_branches b join pgbench_accounts a on (b.bid = a.bid) where b.bid = ? and a.aid between ? and ? group by b.bid order by 1;

You can either manually manipulate the statement to get it in this format do this or we can attempt to do it programmatically. I prefer as much as possible to let the system format it for me so I have written a few helper scripts to do this:

Helper Queries:

**** Feel free to utilize these functions, however they may contain errors or may not normalize all statements. They depend on the pg_stat_statements table and if the entire statement will not fit within the query field of that table, then these functions will not produce the correct output. I will also place them on my public github. If you find any errors or omissions, please let me know. ****

hint_plan.display_candidate_pg_hint_plan_queries

While you can easily select from the “hints” table on your own, this query will show what a normalized statement will look like before loading it to the table. You can leave the “p_query_id” parameter null to return all queries present in the pg_stat_statements in a normalized form or you can populate it with a valid “query_id” and it will return a single normalized statement:

CREATE OR REPLACE FUNCTION hint_plan.display_candidate_pg_hint_plan_queries(
  p_query_id bigint default null
  )
  RETURNS TABLE(queryid bigint, norm_query_string text)
  LANGUAGE 'plpgsql'
  COST 100
  VOLATILE PARALLEL UNSAFE
AS $BODY$
 DECLARE 
 	pg_stat_statements_exists boolean := false;
 BEGIN
   SELECT EXISTS (
    SELECT FROM 
        information_schema.tables 
    WHERE 
        table_schema LIKE 'public' AND 
        table_type LIKE 'VIEW' AND
        table_name = 'pg_stat_statements'
    ) INTO pg_stat_statements_exists;
   IF pg_stat_statements_exists AND p_query_id is not null THEN
    RETURN QUERY
    SELECT pss.queryid,
           substr(regexp_replace(
             regexp_replace(
                regexp_replace(
                   regexp_replace(
                      regexp_replace(pss.query, '\$\d+', '?', 'g'),
                                E'\r', ' ', 'g'),
                              E'\t', ' ', 'g'),
                           E'\n', ' ', 'g'),
                         '\s+', ' ', 'g') || ';',1,100)
 	FROM pg_stat_statements pss where pss.queryid = p_query_id;
   ELSE
    RETURN QUERY
    SELECT pss.queryid,
           substr(regexp_replace(
             regexp_replace(
                regexp_replace(
                   regexp_replace(
                      regexp_replace(pss.query, '\$\d+', '?', 'g'),
                                E'\r', ' ', 'g'),
                              E'\t', ' ', 'g'),
                           E'\n', ' ', 'g'),
                         '\s+', ' ', 'g') || ';',1,100)
 	FROM pg_stat_statements pss;
   END IF;
 END; 
$BODY$;

If our candidate query was this:

select queryid, query from pg_stat_statements where queryid =  -8949523101378282526;
       queryid        |            query
----------------------+-----------------------------
 -8949523101378282526 | select b.bid, sum(abalance)+
                      | from pgbench_branches b    +
                      | join pgbench_accounts a    +
                      | on (b.bid = a.bid)         +
                      | where b.bid = $1           +
                      | group by b.bid             +
                      | order by 1
(1 row)

The display function would return the following normalized query:

SELECT hint_plan.display_candidate_pg_hint_plan_queries(p_query_id => -8949523101378282526);
-[ RECORD 1 ]--------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------
display_candidate_pg_hint_plan_queries | (-8949523101378282526,"select b.bid, sum(abalance) from pgbench_branches b join pgbench_accounts a on (b.bid = a.bid) where b.bid = ? group by b.bid order by 1;")

You can then verify that the query is normalized properly and then move on toward using the next function to add the normalized query to the “hints” table.

hint_plan.add_stored_pg_hint_plan

Using the same query in the previous section, we will now add it to the “hints” table. This is where it is important to understand what hint you want to add.

CREATE OR REPLACE FUNCTION hint_plan.add_stored_pg_hint_plan(
  p_query_id bigint,
  p_hint_text text,
  p_application_name text default ''
  )
  RETURNS varchar
  LANGUAGE 'plpgsql'
  COST 100
  VOLATILE PARALLEL UNSAFE
AS $BODY$
-- p_hint_text can contain one or more hints either separated by a space or
-- a carriage return character.  Examples include:
-- Space Separated: SeqScan(a) Parallel(a 0 hard)
-- ASCII CRLF Separated: SeqScan(a)'||chr(10)||'Parallel(a 0 hard)
-- Single Hint: SeqScan(a)
-- 
-- Escaped text does not work: /* E'SeqScan(a)\nParallel(a 0 hard)'
 DECLARE 
 	hint_id hint_plan.hints.id%TYPE;
 	normalized_query_text hint_plan.hints.norm_query_string%TYPE;
 	pg_stat_statements_exists boolean := false;
 BEGIN
   SELECT EXISTS (
    SELECT FROM 
        information_schema.tables 
    WHERE 
        table_schema LIKE 'public' AND 
        table_type LIKE 'VIEW' AND
        table_name = 'pg_stat_statements'
    ) INTO pg_stat_statements_exists;
   IF NOT pg_stat_statements_exists THEN
    RAISE NOTICE 'pg_stat_statements extension has not been loaded, exiting';
    RETURN 'error';
   ELSE
    SELECT regexp_replace(
             regexp_replace(
                regexp_replace(
                   regexp_replace(
                      regexp_replace(query, '\$\d+', '?', 'g'),
                                E'\r', ' ', 'g'),
                              E'\t', ' ', 'g'),
                           E'\n', ' ', 'g'),
                         '\s+', ' ', 'g') || ';'
 	 INTO normalized_query_text
 	 FROM pg_stat_statements where queryid = p_query_id;
     IF normalized_query_text IS NOT NULL THEN
		INSERT INTO hint_plan.hints(norm_query_string, application_name, hints)
    	VALUES (normalized_query_text,
    			p_application_name,
    			p_hint_text
    	);
    	SELECT id into hint_id
    	FROM hint_plan.hints
    	WHERE norm_query_string = normalized_query_text;
 	    RETURN cast(hint_id as text);
     ELSE
 		RAISE NOTICE 'Query ID %q does not exist in pg_stat_statements', cast(p_query_id as text);
 		RETURN 'error';
     END IF;
   END IF;
 END; 
$BODY$;

Hint text contain one or more hints either separated by a space or a carriage return character. Examples include:

  • Space Separated: SeqScan(a) Parallel(a 0 hard)
  • ASCII CRLF Separated: SeqScan(a)’||chr(10)||’Parallel(a 0 hard)
  • Single Hint: SeqScan(a)
  • Escaped text does not work in the context of this function although this can be used if you are inserting manually to the “hints” table: E’SeqScan(a)\nParallel(a 0 hard)’
SELECT hint_plan.add_stored_pg_hint_plan(p_query_id => -8949523101378282526,
						p_hint_text => 'SeqScan(a) Parallel(a 0 hard)',
						p_application_name => '');

-[ RECORD 1 ]-----------+---
add_stored_pg_hint_plan | 28

Time: 40.889 ms

select * from hint_plan.hints where id = 28;
-[ RECORD 1 ]-----+------------------------------------------------------------------------------------------------------------------------------------------
id                | 28
norm_query_string | select b.bid, sum(abalance) from pgbench_branches b join pgbench_accounts a on (b.bid = a.bid) where b.bid = ? group by b.bid order by 1;
application_name  |
hints             | SeqScan(a) Parallel(a 0 hard)

In the above example, we are forcing a serial sequential scan of the “pgbench_accounts”. We left the “application name” parameter empty so that the hint applies to any calling application.

hint_plan.delete_stored_pg_hint_plan

You could easily just issue a delete against the “hints” table, but in keeping with utilizing a “function” approach to utilizing this functionality, a delete helper has also been developed:

CREATE OR REPLACE FUNCTION hint_plan.delete_stored_pg_hint_plan(
  p_hint_id bigint
  )
  RETURNS TABLE(id integer, norm_query_string text, application_name text, hints text)
  LANGUAGE 'plpgsql'
  COST 100
  VOLATILE PARALLEL UNSAFE
AS $BODY$
 BEGIN
    RETURN QUERY
    DELETE FROM hint_plan.hints h WHERE h.id = p_hint_id RETURNING *;
 END; 
$BODY$;

To delete a plan you can call the procedure as follows:

 SELECT hint_plan.delete_stored_pg_hint_plan(p_hint_id => 28);
-[ RECORD 1 ]--------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
delete_stored_pg_hint_plan | (28,"select b.bid, sum(abalance) from pgbench_branches b join pgbench_accounts a on (b.bid = a.bid) where b.bid = ? group by b.bid order by 1;","","SeqScan(a) Parallel(a 0 hard)")

Time: 33.685 ms
select * from hint_plan.hints where id = 28;
(0 rows)

Time: 24.868 ms

As you can see the “hints” table is very useful and can help you emulate many parts of SQL Plan Management just like in Oracle.

Enjoy and all feedback is welcomed!!!

Leverage Google Cloud Logging + Monitoring for Custom Cloud SQL for Postgres or AlloyDB Alerts

As migrations to CloudSQL and AlloyDB pick up speed, inevitably you will run into a condition where the cloud tooling has not quite caught up with exposing custom alerts and incidents that you may be exposing on-premises with tools such as Nagios or Oracle Enterprise Manager. One such example is monitoring of replication tools such as the GoldenGate Heartbeat table. While there are many ways that you may be able to implement this, I wanted to demonstrate a way to leverage Google Cloud Logging + Google Cloud Monitoring. Using this method will allow us to keep a long term log of certain parameters like lag or anything else you have built into the heartbeat mechanism. To demonstrate, lets use Python to query the database and create a Cloud Logging Entry:

import argparse
from datetime import datetime, timedelta
from sqlalchemy import create_engine, text
from google.cloud import logging


def retrievePgAlert(
    username: str,
    password: str,
    hostname: str,
    portNumber: int,
    databaseName: str,
    alertType: str,
) -> None:

    alertList: list = []

    conn_string = f"postgresql+psycopg2://{username}:{password}@{hostname}:{portNumber}/{databaseName}?client_encoding=utf8"
    engine = create_engine(conn_string)
    with engine.connect() as con:

        if alertType == "ogg-lag":
            sqlQuery = text(
                f"select replicat, effective_date, lag from ogg.heartbeat where lag >=:lagAmt and effective_date >= now() - interval ':intervalAmt min'"
            )

        result = con.execute(
            sqlQuery, {"lagAmt": oggLagAmt, "intervalAmt": checkIntervalMinutes}
        ).fetchall()
        for row in result:
            alertList.append(row)

        if not alertList:
            print(f"No alerts as of {datetime.now().strftime('%m/%d/%Y %H:%M:%S')}")
        else:
            for alertText in alertList:
                print(
                    f"Replicat: {alertText[0]} at date {alertText[1]} has a total lag of: {alertText[2]} seconds"
                )

            writeGcpCloudLoggingAlert(
                logger_alert_type=alertType,
                loggerName=args.loggerName,
                logger_message=alertList,
            )

    con.close()
    engine.dispose()


def writeGcpCloudLoggingAlert(
    logger_alert_type: str,
    loggerName: str,
    logger_message: list,
) -> None:

    # Writes log entries to the given logger.
    logging_client = logging.Client()

    # This log can be found in the Cloud Logging console under 'Custom Logs'.
    logger = logging_client.logger(loggerName)

    # Struct log. The struct can be any JSON-serializable dictionary.
    if logger_alert_type == "ogg-lag":
        replicatName: str
        effectiveDate: datetime
        lagAmount: str

        for alertFields in logger_message:
            replicatName = alertFields[0]
            effectiveDate = alertFields[1]
            lagAmount = int(alertFields[2])

            logger.log_struct(
                {
                    "alertType": logger_alert_type,
                    "replicat": str(alertFields[0]),
                    "alertDate": alertFields[1].strftime("%m/%d/%Y, %H:%M:%S"),
                    "alertRetrievalDate": datetime.now().strftime("%m/%d/%Y, %H:%M:%S"),
                    "lagInSeconds": int(alertFields[2]),
                },
                severity="ERROR",
            )

    print("Wrote logs to {}.".format(logger.name))


def delete_logger(loggerName):
    """Deletes a logger and all its entries.

    Note that a deletion can take several minutes to take effect.
    """
    logging_client = logging.Client()
    logger = logging_client.logger(loggerName)

    logger.delete()

    print("Deleted all logging entries for {}".format(logger.name))


if __name__ == "__main__":

    cloudSQLHost: str = "127.0.0.1"
    hostname: str
    portNumber: str
    database: str
    username: str
    password: str
    oggLagAmt: int = 15
    checkIntervalMinutes: int = 20

    with open("~/.pgpass", "r") as pgpassfile:
        for line in pgpassfile:
            if line.strip().split(":")[0] == cloudSQLHost:
                hostname, portNumber, database, username, password = line.strip().split(
                    ":"
                )

    parser = argparse.ArgumentParser(
        description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
    )
    parser.add_argument(
        "-loggerName",
        "--loggerName",
        type=str,
        help="GCP Cloud Log Namespace",
        default="postgres-alert",
    )
    parser.add_argument(
        "-alertType",
        "--alertType",
        type=str,
        help="Type of alert to log",
        default="ogg-lag",
    )
    args = parser.parse_args()

    if args.alertType == "ogg-lag":
        retrievePgAlert(
            hostname=hostname,
            username=username,
            password=password,
            portNumber=portNumber,
            databaseName=database,
            alertType=args.alertType,
        )

In this script we utilize the Google Cloud Logging APIs, SQLAlchemy and some other basic python imports to query the database based on a lag amount we are looking for from the heartbeat table.

***Note: The query within the python code could check for any condition by changing the query, by leveraging “gcloud” commands or REST API calls.

If the condition is met, the script creates a JSON message which is then written to the appropriate Google Cloud Logging Namespace. An example of the JSON message is below (sensitive information like the project id and instance id have been redacted):

{
  "insertId": "1b6fb35g18b606n",
  "jsonPayload": {
    "alertRetrievalDate": "01/20/2023, 18:47:20",
    "lagInSeconds": 15,
    "alertType": "ogg-lag",
    "alertDate": "01/20/2023, 18:34:55",
    "replicat": "r_hr"
  },
  "resource": {
    "type": "gce_instance",
    "labels": {
      "project_id": "[project id]",
      "instance_id": "****************",
      "zone": "projects/[project id]/zones/us-central1-c"
    }
  },
  "timestamp": "2023-01-20T18:47:20.103058301Z",
  "severity": "ERROR",
  "logName": "projects/[project id]/logs/postgres-alert",
  "receiveTimestamp": "2023-01-20T18:47:20.103058301Z"
}

Create a Cloud Logging Alert

Now that we have published a message to Cloud Logging, what can we do with it? Generally there are two paths, either a Cloud Metric or a Cloud Alert. For this demonstration, we will use the “Cloud Alert”. So to start the setup navigate to the console page “Operations Logging” —> “Logs Explorer”. From there click the “Create alert” function. The following dialog will show. You will need to double check the query to retrieve the appropriate logs in step 2, and in step 3, you can choose the time between notifications (this is to mute alerts that happen in between the interval) and how long past the last alert an incident will stay open. In this case, we will mute duplicate alerts that happen for 5 minutes after the first alert (if an alert occurs at 6 minutes another notification will fire) and incidents will remain open for 30 minutes past the last alert (no new incidents will be logged unless an alert occurs after that time frame). The query to be used within the alert is as follows:


logName="projects/[project id]/logs/postgres-alert"
AND severity="ERROR"
AND (jsonPayload.alertType = "ogg-lag")
AND (jsonPayload.lagInSeconds >= 15)
AND resource.labels.instance_id = [instance id]

The following dialogues outline the screens used to setup the alert.

The last step will be to choose your notification method, which is managed by different notification channels. The different types of notification channels include:

  • Mobile Devices
  • PagerDuty Services
  • PagerDuty Sync
  • Slack
  • Webhooks
  • E-Mail
  • SMS
  • Pub/Sub

Once all of this is defined, your alert is now set to notify once you place the python script on an appropriate schedule such as linux cron, Google Cloud Scheduler, etc. In this case we will now wait for an issue to occur that conforms to the alert. When it does an email like the following will result to the notification channel:

As your migration to cloud continues, keep an open mind and look for alternative ways to handle all of the operational “things” you are accustomed to in your on-premises environment. Most of the time there is a way in cloud to handle it!