PostgreSQL® JDBC API 扩展

PostgreSQL® 是一个可扩展的数据库系统。您可以向服务器添加自己的函数,然后从查询中调用这些函数,甚至添加自己的数据类型。由于这些是 PostgreSQL® 独有的功能,我们通过一组扩展 API 从 Java 中支持它们。标准驱动程序核心中的某些功能实际上使用这些扩展来实现大型对象等。

要访问某些扩展,您需要在 org.postgresql.PGConnection 类中使用一些额外的 方法。在这种情况下,您需要将 Driver.getConnection() 的返回值强制转换为该类型。例如

Connection db = Driver.getConnection(url, username, password);
// ...
// later on
Fastpath fp = db.unwrap(org.postgresql.PGConnection.class).getFastpathAPI();

PostgreSQL® 提供了一组数据类型,可以将几何特征存储到表中。这些类型包括单个点、线和多边形。我们在 Java 中使用 org.postgresql.geometric 包支持这些类型。有关可用类和功能的详细信息,请参阅进一步阅读中提到的 Javadoc。

import java.sql.*;

import org.postgresql.geometric.PGpoint;
import org.postgresql.geometric.PGcircle;

public class GeometricTest {
    public static void main(String args[]) throws Exception {
        String url = "jdbc:postgresql://localhost:5432/test";
        try (Connection conn = DriverManager.getConnection(url, "test", "")) {
            try (Statement stmt = conn.createStatement()) {
                stmt.execute("CREATE TEMP TABLE geomtest(mycirc circle)");
            }
            insertCircle(conn);
            retrieveCircle(conn);
        }
    }

    private static void insertCircle(Connection conn) throws SQLException {
        PGpoint center = new PGpoint(1, 2.5);
        double radius = 4;
        PGcircle circle = new PGcircle(center, radius);
        try (PreparedStatement ps = conn.prepareStatement("INSERT INTO geomtest(mycirc) VALUES (?)")) {
            ps.setObject(1, circle);
            ps.executeUpdate();
        }
    }

    private static void retrieveCircle(Connection conn) throws SQLException {
        try (Statement stmt = conn.createStatement()) {
            try (ResultSet rs = stmt.executeQuery("SELECT mycirc, area(mycirc) FROM geomtest")) {
                while (rs.next()) {
                    PGcircle circle = (PGcircle) rs.getObject(1);
                    double area = rs.getDouble(2);

                    System.out.println("Center (X, Y) = (" + circle.center.x + ", " + circle.center.y + ")");
                    System.out.println("Radius = " + circle.radius);
                    System.out.println("Area = " + area);
                }
            }
        }
    }
}

标准 JDBC 规范支持大对象。但是,该接口有限,PostgreSQL® 提供的 API 允许对对象内容进行随机访问,就像它是本地文件一样。

org.postgresql.largeobject 包为 Java 提供了 libpq C 接口的大对象 API。它包含两个类,LargeObjectManager,用于处理创建、打开和删除大对象,以及LargeObject,用于处理单个对象。有关此 API 的示例用法,请参阅在 JDBC 中处理二进制数据

Listen 和 Notify 提供了一种简单的信号或进程间通信机制,用于访问同一个 PostgreSQL® 数据库的多个进程。有关通知的更多信息,请参阅主服务器文档。本节仅介绍通知的 JDBC 特定方面。

标准 LISTENNOTIFYUNLISTEN 命令通过标准 Statement 接口发出。要检索和处理检索到的通知,Connection 必须转换为 PostgreSQL® 特定扩展接口 PGConnection。从那里,可以使用 getNotifications() 方法检索任何未决通知。

注意

JDBC 驱动程序的一个主要限制是它无法接收异步通知,必须轮询后端以检查是否发出了任何通知。可以为轮询函数提供超时,但随后来自其他线程的语句执行将被阻塞。

import java.sql.*;

public class NotificationTest {
    public static void main(String args[]) throws Exception {
        Class.forName("org.postgresql.Driver");
        String url = "jdbc:postgresql://localhost:5432/test";

        // Create two distinct connections, one for the notifier
        // and another for the listener to show the communication
        // works across connections although this example would
        // work fine with just one connection.

        Connection lConn = DriverManager.getConnection(url, "test", "");
        Connection nConn = DriverManager.getConnection(url, "test", "");

        // Create two threads, one to issue notifications and
        // the other to receive them.

        Listener listener = new Listener(lConn);
        Notifier notifier = new Notifier(nConn);
        listener.start();
        notifier.start();
    }
}

class Listener extends Thread {
    private Connection conn;
    private org.postgresql.PGConnection pgconn;

    Listener(Connection conn) throws SQLException {
        this.conn = conn;
        this.pgconn = conn.unwrap(org.postgresql.PGConnection.class);
        Statement stmt = conn.createStatement();
        stmt.execute("LISTEN mymessage");
        stmt.close();
    }

    public void run() {
        try {
            while (true) {
                org.postgresql.PGNotification notifications[] = pgconn.getNotifications();

                // If this thread is the only one that uses the connection, a timeout can be used to
                // receive notifications immediately:
                // org.postgresql.PGNotification notifications[] = pgconn.getNotifications(10000);

                if (notifications != null) {
                    for (int i = 0; i < notifications.length; i++)
                        System.out.println("Got notification: " + notifications[i].getName());
                }

                // wait a while before checking again for new
                // notifications

                Thread.sleep(500);
            }
        } catch (SQLException sqle) {
            sqle.printStackTrace();
        } catch (InterruptedException ie) {
            ie.printStackTrace();
        }
    }
}

class Notifier extends Thread {
    private Connection conn;

    public Notifier(Connection conn) {
        this.conn = conn;
    }

    public void run() {
        while (true) {
            try {
                Statement stmt = conn.createStatement();
                stmt.execute("NOTIFY mymessage");
                stmt.close();
                Thread.sleep(2000);
            } catch (SQLException sqle) {
                sqle.printStackTrace();
            } catch (InterruptedException ie) {
                ie.printStackTrace();
            }
        }
    }
}

PostgreSQL® 服务器允许客户端编译预计要重复使用的 SQL 语句,以避免每次执行时解析和规划语句的开销。此功能从服务器版本 7.3 开始在 SQL 级别通过 PREPARE 和 EXECUTE 提供,从服务器版本 7.4 开始在协议级别提供,但作为 Java 开发人员,我们实际上只想使用标准的 PreparedStatement 接口。

注意

PostgreSQL® 9.2 版本说明:准备好的语句以前只优化一次,没有任何关于参数值的知识。在 9.2 中,规划器将使用关于发送参数的特定计划(查询将在执行时规划),除非查询被执行多次,并且规划器决定通用计划并不比特定计划贵太多。

服务器端准备好的语句可以提高执行速度,因为

  1. 它只发送语句句柄(例如 S_1),而不是完整的 SQL 文本
  2. 它允许使用二进制传输(例如二进制 int4、二进制时间戳等);参数和结果解析速度更快
  3. 它允许重用服务器端执行计划
  4. 客户端可以重用结果集列定义,因此它不必在每次执行时接收和解析元数据

驱动程序的早期版本使用 PREPARE 和 EXECUTE 来实现服务器端准备好的语句。
这在所有从 7.3 开始的服务器版本上都受支持,但会导致应用程序可见的查询结果更改,例如缺少 ResultSet 元数据和行更新计数。当前驱动程序使用 V3 协议级等效项,避免了这些查询结果的更改。扩展查询协议准备了一个临时的“未命名语句”。有关详细信息,请参阅 扩展查询 第 53.2.3 节。

当使用 PreparedStatement API 时,驱动程序默认使用扩展协议。

一个内部计数器跟踪语句执行的次数,当它达到 prepareThreshold(默认值为 5)时,驱动程序将切换到创建命名语句并使用 PrepareExecute

出于性能原因,通常最好重用同一个 PreparedStatement 对象,但是驱动程序能够在 connection.prepareStatement(...) 调用中自动服务器准备语句。

例如

PreparedStatement ps = con.prepareStatement("select /*test*/ ?::int4");
ps.setInt(1, 42);
ps.executeQuery().close();
ps.close();

PreparedStatement ps = con.prepareStatement("select /*test*/ ?::int4");
ps.setInt(1, 43);
ps.executeQuery().close();
ps.close();

PreparedStatement ps = con.prepareStatement("select /*test*/ ?::int4");

ps.setInt(1, 42);
ps.executeQuery().close();

ps.setInt(1, 43);
ps.executeQuery().close();

但是,pgJDBC 在两种情况下都可以使用服务器端预备语句。

注意

Statement 对象绑定到 Connection,从多个并发线程访问同一个 Statement 和/或 Connection 不是一个好主意(除了 cancel()close() 和类似情况)。最好直接 close() 语句,而不是尝试以某种方式缓存它。

服务器端预备语句在客户端和服务器上都会消耗内存,因此 pgJDBC 限制每个连接的服务器端预备语句数量。它可以通过 preparedStatementCacheQueries(默认值为 256,即 pgJDBC 已知的查询数量)和 preparedStatementCacheSizeMiB(默认值为 5,即每个连接的客户端缓存大小,以兆字节为单位)进行配置。只有 statement cache 的一个子集是服务器端预备的,因为某些语句可能无法达到 prepareThreshold

在某些情况下,您可能希望禁用服务器端预备语句的使用。例如,如果您将连接路由到与服务器端预备语句不兼容的负载均衡器,则别无选择。

您可以通过设置 prepareThreshold=0 来禁用服务器端预备语句的使用。

V3 协议避免在每次执行时发送列元数据,并且 BIND 消息指定输出列格式。这会为以下情况造成问题

SELECT * FROM mytable;
ALTER mytable ADD column ...;
SELECT * FROM mytable;

这会导致 缓存的计划不能更改结果类型 错误,并导致事务失败。

建议是

  1. 在 SELECT 列表中使用显式列名
  2. 避免更改列类型

存在明确的命令来释放所有服务器端准备好的语句。这将导致以下服务器端错误消息:准备好的语句名称无效。当然,这可能会破坏 pgJDBC,但是有些情况下您需要丢弃语句(例如,在大量 DDL 后)。

建议是

  1. 使用简单的 DEALLOCATE ALL 和/或 DISCARD ALL 命令,避免将命令嵌套到 pl/pgsql 或类似的命令中。驱动程序可以理解顶层的 DEALLOCATE/DISCARD 命令,并且也会使客户端缓存失效。
  2. 重新连接。缓存是针对每个连接的,因此如果您重新连接,它将失效。

PostgreSQL® 允许自定义 search_path,它为开发人员提供了强大的功能。有了强大的功能,以下情况可能会发生

set search_path='app_v1';
SELECT * FROM mytable;
set search_path='app_v2';
SELECT * FROM mytable; -- Does mytable mean app_v1.mytable or app_v2.mytable here?

服务器端准备好的语句链接到数据库对象 ID,因此它可以从“旧的”app_v1.mytable 表中获取数据。很难判断哪种行为是预期的,但是 pgJDBC 尝试跟踪 search_path 更改,并相应地使准备缓存失效。

建议是

  1. 避免频繁更改 search_path,因为它会使服务器端准备好的语句失效。
  2. 使用简单的 set search_path... 命令,避免将命令嵌套到 pl/pgsql 或类似的命令中,否则 pgJDBC 将无法识别 search_path 更改。

令人遗憾的是,一个简单的 cached plan must not change result type 错误会导致整个事务失败。驱动程序可以在某些情况下自动重新执行语句。

  1. 如果事务没有失败(例如,在执行导致 cached plan... 错误的语句之前,事务不存在),那么 pgJDBC 会自动重新执行语句。这使应用程序能够正常运行,并避免不必要的错误。
  2. 如果事务处于失败状态,则除了回滚它之外别无他法。pgJDBC 确实具有“自动保存点”功能,它可以自动回滚并重试语句。此行为由 autosave 属性控制(默认值为 never)。conservative 的值将针对与无效服务器准备语句相关的错误自动回滚。

注意

autosave 可能会导致长时间事务的严重性能问题,因为 PostgreSQL® 后端没有针对长时间事务和大量保存点的场景进行优化。

PostgreSQL® 复制连接不允许使用服务器端准备语句,因此 pgJDBC 在激活 replication 连接属性的情况下使用简单查询。

默认情况下,pgJDBC 仅对 PreparedStatement 使用服务器端预处理语句,但您可能希望为常规 Statement 激活服务器端预处理语句。例如,如果您通过 con.createStatement().executeQuery(...) 执行相同的语句,那么您可以通过缓存语句来提高性能。当然,最好显式使用 PreparedStatements,但驱动程序可以选择缓存简单的语句。

您可以通过将 preferQueryMode 设置为 extendedCacheEverything 来实现。

注意

该选项更像是一种诊断/调试方式,因此请谨慎使用。

数据库针对给定的参数类型优化执行计划。考虑以下情况

-- create table rooms (id int4, name varchar);
-- create index name__rooms on rooms(name);
PreparedStatement ps = con.prepareStatement("select id from rooms where name=?");
ps.setString(1, "42");

它按预期工作,但是如果使用 setInt 会发生什么?ps.setInt(1, 42);

即使结果相同,第一个变体(setString 案例)使数据库能够使用索引 name__rooms,而第二个变体则不能。如果数据库获取到 42 作为整数,它将使用类似 where cast(name as int4) = ? 的计划。

该计划必须针对(SQL 文本参数类型)组合是特定的,因此如果使用不同的参数类型,驱动程序必须使服务器端预处理语句失效。

对于批处理操作来说,这尤其令人头疼,因为你不想通过使用交替的数据类型来中断批处理。

最典型的案例如下(切勿在生产环境中使用此方法)

PreparedStatement ps = con.prepareStatement("select id from rooms where ...");
if (param instanceof String) {
    ps.setString(1, param);
} else if (param instanceof Integer) {
    ps.setInt(1, ((Integer) param).intValue());
} else {
    // Does it really matter which type of NULL to use?
    // In fact, it does since data types specify which server-procedure to call
    ps.setNull(1, Types.INTEGER);
}

正如你可能猜到的,setStringsetNull(..., Types.INTEGER) 会导致数据类型交替,并迫使驱动程序在服务器端语句中失效并重新准备。

建议对每个绑定占位符使用一致的数据类型,并对 setNull 使用相同的类型。查看 org.postgresql.test.jdbc2.PreparedStatementTest.testAlternatingBindType 示例以了解更多详细信息。

如果你遇到 cached plan must not change result typeprepared statement \"S_2\" does not exist,以下内容可能有助于调试这种情况。

  1. 客户端日志记录。如果你添加 loggerLevel=TRACE&loggerFile=pgjdbc-trace.log,你将获得驱动程序和后端之间发送的消息的跟踪。
  2. 你可以检查 org.postgresql.test.jdbc2.AutoRollbackTestSuite,因为它验证了许多组合。
import java.sql.*;

public class ServerSidePreparedStatement {

    public static void main(String args[]) throws Exception {
        
        String url = "jdbc:postgresql://localhost:5432/test";
        try (Connection conn = DriverManager.getConnection(url, "test", "")){

            try (PreparedStatement pstmt = conn.prepareStatement("SELECT ?")){

                // cast to the pg extension interface
                org.postgresql.PGStatement pgstmt = pstmt.unwrap(org.postgresql.PGStatement.class);

                // on the third execution start using server side statements
                pgstmt.setPrepareThreshold(3);

                for (int i = 1; i <= 5; i++) {
                    pstmt.setInt(1, i);
                    boolean usingServerPrepare = pgstmt.isUseServerPrepare();
                    ResultSet rs = pstmt.executeQuery();
                    rs.next();
                    System.out.println("Execution: " + i + ", Used server side: " + usingServerPrepare + ", Result: " + rs.getInt(1));
                    rs.close();
                }
            }        
        }
    }
}

这会在第三次执行时产生使用服务器端预处理语句的预期结果。

执行 使用服务器端 结果
1 false 1
2 false 2
3 true 3
4 true 4
5 true 5

上面显示的示例要求程序员在理论上可移植的 API 中使用 PostgreSQL® 特定的代码,这不是理想的。它还只为该特定语句设置了阈值,如果我们想为每个语句使用该阈值,则需要额外输入。让我们看看设置阈值以启用服务器端预处理语句的其他方法。在 PreparedStatement 之上已经存在一个层次结构,它来自创建它的 Connection,以及连接的来源,无论是 Datasource 还是 URL。可以在这些级别中的任何一个级别设置服务器端预处理语句阈值,这样该值将成为其所有子级的默认值。

// pg extension interfaces
org.postgresql.PGConnection pgconn;
org.postgresql.PGStatement pgstmt;

// set a prepared statement threshold for connections created from this url
String url = "jdbc:postgresql://localhost:5432/test?prepareThreshold=3";

// see that the connection has picked up the correct threshold from the url
Connection conn = DriverManager.getConnection(url, "test", "");
pgconn = conn.unwrap(org.postgresql.PGConnection.class);
System.out.println(pgconn.getPrepareThreshold()); // Should be 3

// see that the statement has picked up the correct threshold from the connection
PreparedStatement pstmt = conn.prepareStatement("SELECT ?");
pgstmt = pstmt.unwrap(org.postgresql.PGStatement.class);
System.out.println(pgstmt.getPrepareThreshold()); // Should be 3

// change the connection's threshold and ensure that new statements pick it up
pgconn.setPrepareThreshold(5);
PreparedStatement pstmt = conn.prepareStatement("SELECT ?");
pgstmt = pstmt.unwrap(org.postgresql.PGStatement.class);
System.out.println(pgstmt.getPrepareThreshold()); // Should be 5

PostgreSQL® 支持服务器参数,也称为服务器变量,或在内部称为 Grand Unified Configuration (GUC) 变量。这些变量由 SET 命令、postgresql.confALTER SYSTEM SETALTER USER SETALTER DATABASE SETset_config(...) SQL 可调用函数等操作。请参阅 PostgreSQL 手册

对于这些变量的一个子集,服务器将自动将值的变化报告给客户端驱动程序和应用程序。这些变量在内部称为 GUC_REPORT 变量,以启用该功能的标志的名称命名。

服务器跟踪所有变量范围,并在变量恢复到先前值时报告,因此客户端不必猜测当前值是什么,以及某些服务器端函数是否可能更改了它。无论出于何种原因或如何更改,只要值发生变化,服务器就会在参数状态协议消息中向客户端报告新的有效值。pgJDBC 在内部使用许多这些报告。

从 pgJDBC 42.2.6 版本开始,它还通过 PGConnection 扩展接口向用户应用程序公开参数状态信息。

org.postgresql.PGConnection 上的两个方法为报告的参数提供客户端接口。参数名称不区分大小写,但保留大小写。

  • Map PGConnection.getParameterStatuses() - 返回所有报告参数及其值的映射。

  • String PGConnection.getParameterStatus() - 用于通过名称检索一个值,如果未报告任何值,则返回 null。

有关详细信息,请参阅 PGConnection JavaDoc。

如果您直接使用 java.sql.Connection,则可以

import org.postgresql.PGConnection;

void my_function(Connection conn) {
    System.out.println("My application name is " + ((PGConnection) conn).getParameterStatus("application_name"));
}

libpq 的等效项是 PQparameterStatus(...) API 函数。

Postgres 9.4(于 2014 年 12 月发布)引入了一项名为逻辑复制的新功能。逻辑复制允许将数据库中的更改实时流式传输到外部系统。物理复制和逻辑复制之间的区别在于,逻辑复制以逻辑格式发送数据,而物理复制以二进制格式发送数据。此外,逻辑复制可以发送单个表或数据库。二进制复制以全有或全无的方式复制整个集群;也就是说,无法使用二进制复制获取特定表或数据库。

在逻辑复制之前,实时保持外部系统同步存在问题。应用程序必须更新/使相应的缓存条目失效,重新索引搜索引擎中的数据,将其发送到分析系统,等等。

这会导致竞争条件和可靠性问题。例如,如果稍微不同的数据被写入两个不同的数据存储(可能是由于错误或竞争条件),数据存储的内容将逐渐偏离——它们会随着时间的推移变得越来越不一致。从这种逐渐的数据损坏中恢复是困难的。

逻辑解码获取数据库的预写日志 (WAL),并让我们访问行级更改事件:每次在表中插入、更新或删除一行时,都会产生一个事件。这些事件按事务分组,并按它们提交到数据库的顺序出现。已中止/回滚的事务不会出现在流中。因此,如果您按相同的顺序应用更改事件,您最终将获得数据库的精确、事务一致的副本。它看起来像您之前在应用程序中实现的事件溯源模式,但现在它可以从 PostgreSQL® 数据库中开箱即用。

为了访问实时更改,PostgreSQL® 提供了流式复制协议。复制协议可以是物理的或逻辑的。物理复制协议用于主/从复制。逻辑复制协议可用于将更改流式传输到外部系统。

由于 JDBC API 不包含复制,PGConnection 实现 PostgreSQL® API

您的数据库应配置为启用逻辑或物理复制。

  • 属性 max_wal_senders 应该至少等于复制消费者的数量。
  • 属性 wal_keep_segments 应该包含无法从数据库中删除的 WAL 段的数量。
  • 逻辑复制的属性 wal_level 应该等于 logical
  • 逻辑复制的属性 max_replication_slots 应该大于零,因为逻辑复制在没有复制槽的情况下无法工作。

启用具有复制权限的用户连接到复制流。

local   replication   all                   trust
host    replication   all   127.0.0.1/32    md5
host    replication   all   ::1/128         md5

postgresql.conf

max_wal_senders = 4             # max number of walsender processes
wal_keep_segments = 4           # in logfile segments, 16MB each; 0 disables
wal_level = logical             # minimal, replica, or logical
max_replication_slots = 4       # max number of replication slots

pg_hba.conf

# Allow replication connections from localhost, by a user with the
# replication privilege.
local   replication   all                   trust
host    replication   all   127.0.0.1/32    md5
host    replication   all   ::1/128         md5

逻辑复制使用复制槽来预留服务器上的WAL日志,并定义要使用的解码插件来将WAL日志解码为所需格式,例如,您可以将更改解码为json、protobuf等。为了演示如何使用pgJDBC复制API,我们将使用test_decoding插件,该插件包含在postgresql-contrib包中,但您可以使用自己的解码插件。GitHub上有一些可以作为示例使用的插件。

为了使用复制API,连接必须在复制模式下创建,在这种模式下,连接不可用于执行SQL命令,只能与复制API一起使用。这是PostgreSQL®强加的限制。

String url = "jdbc:postgresql://localhost:5432/postgres";
Properties props = new Properties();
PGProperty.USER.set(props, "postgres");
PGProperty.PASSWORD.set(props, "postgres");
PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.4");
PGProperty.REPLICATION.set(props, "database");
PGProperty.PREFER_QUERY_MODE.set(props, "simple");

Connection con = DriverManager.getConnection(url, props);
PGConnection replConnection = con.unwrap(PGConnection.class);

整个复制API都分组在org.postgresql.replication.PGReplicationConnection中,可以通过org.postgresql.PGConnection#getReplicationAPI访问。

在您开始复制协议之前,您需要拥有复制槽,也可以通过 pgJDBC API 创建。

replConnection.getReplicationAPI()
    .createReplicationSlot()
    .logical()
    .withSlotName("demo_logical_slot")
    .withOutputPlugin("test_decoding")
    .make();

拥有复制槽后,我们可以创建 ReplicationStream。

PGReplicationStream stream =
    replConnection.getReplicationAPI()
        .replicationStream()
        .logical()
        .withSlotName("demo_logical_slot")
        .withSlotOption("include-xids", false)
        .withSlotOption("skip-empty-xacts", true)
        .start();

复制流将发送自创建复制槽以来的所有更改,或者如果槽已用于复制,则从复制槽重启 LSN 发送所有更改。您也可以从特定 LSN 位置开始流式传输更改,在这种情况下,在创建复制流时应指定 LSN 位置。

LogSequenceNumber waitLSN = LogSequenceNumber.valueOf("6F/E3C53568");

PGReplicationStream stream =
    replConnection.getReplicationAPI()
        .replicationStream()
        .logical()
        .withSlotName("demo_logical_slot")
        .withSlotOption("include-xids", false)
        .withSlotOption("skip-empty-xacts", true)
        .withStartPosition(waitLSN)
        .start();

通过withSlotOption,我们还可以指定发送到输出插件的选项,这允许用户自定义解码。例如,我有一个自己的输出插件,它有一个属性sensitive=true,它将包含敏感列的更改以更改事件。

BEGIN 105779
table public.test_logic_table: INSERT: pk[integer]:1 name[character varying]:'previous value'
COMMIT 105779
BEGIN
table public.test_logic_table: INSERT: pk[integer]:1 name[character varying]:'previous value'
COMMIT

在复制过程中,数据库和消费者定期交换 ping 消息。当数据库或客户端在配置的超时时间内未收到 ping 消息时,复制被认为已停止,将抛出异常,数据库将释放资源。在 PostgreSQL® 中,ping 超时由属性wal_sender_timeout(默认值为 60 秒)配置。pgjdc 中的复制流可以配置为在需要时或按时间间隔发送反馈(ping)。建议比配置的wal_sender_timeout 更频繁地向数据库发送反馈(ping)。在生产系统中,我使用等于wal_sender_timeout / 3 的值。它避免了网络和更改在超时断开连接之前被流式传输的潜在问题。要指定反馈间隔,请使用withStatusInterval 方法。

PGReplicationStream stream =
    replConnection.getReplicationAPI()
        .replicationStream()
        .logical()
        .withSlotName("demo_logical_slot")
        .withSlotOption("include-xids", false)
        .withSlotOption("skip-empty-xacts", true)
        .withStatusInterval(20, TimeUnit.SECONDS)
        .start();

创建 PGReplicationStream 后,就可以开始实时接收变更了。

变更可以从流中以阻塞方式 (org.postgresql.replication.PGReplicationStream#read) 或非阻塞方式 (org.postgresql.replication.PGReplicationStream#readPending) 接收。两种方法都将变更接收为 java.nio.ByteBuffer,其中包含来自发送输出插件的有效负载。我们无法接收部分消息,只能接收输出插件发送的完整消息。ByteBuffer 中包含由解码输出插件定义的格式的消息,它可以是简单的字符串、json 或插件确定的任何内容。这就是 pgJDBC 返回原始 ByteBuffer 而不是进行假设的原因。

OutputPluginPrepareWrite(ctx, true);
appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
OutputPluginWrite(ctx, true);

示例 9.12. 通过复制流接收变更。

while (true) {
    //non blocking receive message
    ByteBuffer msg = stream.readPending();

    if (msg == null) {
        TimeUnit.MILLISECONDS.sleep(10 L);
        continue;
    }

    int offset = msg.arrayOffset();
    byte[] source = msg.array();
    int length = source.length - offset;
    System.out.println(new String(source, offset, length));
}

如前所述,复制流应定期向数据库发送反馈,以防止因超时而断开连接。当调用 `read` 或 `readPending` 时,如果需要发送反馈,则会自动发送反馈。也可以通过 `org.postgresql.replication.PGReplicationStream#forceUpdateStatus()` 发送反馈,无论超时与否。反馈的另一个重要职责是向服务器提供已成功接收并应用于消费者的逻辑序列号 (LSN),这对于监控以及截断/归档不再需要的 WAL 来说是必要的。如果复制已重新启动,它将从上次通过反馈发送到数据库的成功处理的 LSN 开始。

该 API 提供以下反馈机制,以指示当前消费者成功应用的 LSN。在此之前的 LSN 可以被截断或归档。`org.postgresql.replication.PGReplicationStream#setFlushedLSN` 和 `org.postgresql.replication.PGReplicationStream#setAppliedLSN`。您始终可以通过 `org.postgresql.replication.PGReplicationStream#getLastReceiveLSN` 获取最后接收的 LSN。

while (true) {
    //Receive last successfully send to queue message. LSN ordered.
    LogSequenceNumber successfullySendToQueue = getQueueFeedback();
    if (successfullySendToQueue != null) {
        stream.setAppliedLSN(successfullySendToQueue);
        stream.setFlushedLSN(successfullySendToQueue);
    }

    //non blocking receive message
    ByteBuffer msg = stream.readPending();

    if (msg == null) {
        TimeUnit.MILLISECONDS.sleep(10 L);
        continue;
    }

    asyncSendToQueue(msg, stream.getLastReceiveLSN());
}
String url = "jdbc:postgresql://localhost:5432/test";
Properties props = new Properties();
PGProperty.USER.set(props, "postgres");
PGProperty.PASSWORD.set(props, "postgres");
PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.4");
PGProperty.REPLICATION.set(props, "database");
PGProperty.PREFER_QUERY_MODE.set(props, "simple");

Connection con = DriverManager.getConnection(url, props);
PGConnection replConnection = con.unwrap(PGConnection.class);

replConnection.getReplicationAPI()
    .createReplicationSlot()
    .logical()
    .withSlotName("demo_logical_slot")
    .withOutputPlugin("test_decoding")
    .make();

//some changes after create replication slot to demonstrate receive it
sqlConnection.setAutoCommit(true);
Statement st = sqlConnection.createStatement();
st.execute("insert into test_logic_table(name) values('first tx changes')");
st.close();

st = sqlConnection.createStatement();
st.execute("update test_logic_table set name = 'second tx change' where pk = 1");
st.close();

st = sqlConnection.createStatement();
st.execute("delete from test_logic_table where pk = 1");
st.close();

PGReplicationStream stream =
    replConnection.getReplicationAPI()
    .replicationStream()
    .logical()
    .withSlotName("demo_logical_slot")
    .withSlotOption("include-xids", false)
    .withSlotOption("skip-empty-xacts", true)
    .withStatusInterval(20, TimeUnit.SECONDS)
    .start();

while (true) {
    //non blocking receive message
    ByteBuffer msg = stream.readPending();

    if (msg == null) {
        TimeUnit.MILLISECONDS.sleep(10 L);
        continue;
    }

    int offset = msg.arrayOffset();
    byte[] source = msg.array();
    int length = source.length - offset;
    System.out.println(new String(source, offset, length));

    //feedback
    stream.setAppliedLSN(stream.getLastReceiveLSN());
    stream.setFlushedLSN(stream.getLastReceiveLSN());
}

当输出看起来像这样时,每行都是一条单独的消息。

BEGIN
table public.test_logic_table: INSERT: pk[integer]:1 name[character varying]:'first tx changes'
COMMIT
BEGIN
table public.test_logic_table: UPDATE: pk[integer]:1 name[character varying]:'second tx change'
COMMIT
BEGIN
table public.test_logic_table: DELETE: pk[integer]:1
COMMIT

物理复制的 API 看起来像逻辑复制的 API。物理复制不需要复制槽。ByteBuffer 将包含 WAL 日志的二进制形式。二进制 WAL 格式是一个非常底层的 API,并且可能会在不同版本之间发生变化。这就是为什么不同主要 PostgreSQL® 版本之间的复制不可行。但是物理复制可以包含许多逻辑复制中不可用的重要数据。这就是为什么 pgJDBC 包含了对两者的实现。

示例 9.15。使用物理复制

LogSequenceNumber lsn = getCurrentLSN();

Statement st = sqlConnection.createStatement();
st.execute("insert into test_physic_table(name) values('previous value')");
st.close();

PGReplicationStream stream =
    pgConnection
    .getReplicationAPI()
    .replicationStream()
    .physical()
    .withStartPosition(lsn)
    .start();

ByteBuffer read = stream.read();

PostgreSQL® 为数组数据类型提供了强大的支持,作为列类型、函数参数和 where 子句中的条件。使用 pgJDBC 创建数组有几种方法。

可以使用 java.sql. Connection.createArrayOf(String, Object[])Object[] 实例创建 java.sql. Array(注意:这包括原始和对象多维数组)。类似的方法 org.postgresql.PGConnection.createArrayOf(String, Object) 为原始数组类型提供支持。从这些方法返回的 java.sql.Array 对象可以在其他方法中使用,例如 PreparedStatement.setArray(int, Array)

以下类型的数组支持请求中的二进制表示,并且可以在 PreparedStatement.setObject 中使用

Java 类型 支持的二进制 PostgreSQL® 类型 默认 PostgreSQL® 类型
short[] , Short[] int2[] int2[]
int[] , Integer[] int4[] int4[]
long[] , Long[] int8[] int8[]
float[] , Float[] float4[] float4[]
double[] , Double[] float8[] float8[]
boolean[] , Boolean[] bool[] bool[]
String[] varchar[] , text[] varchar[]
byte[][] bytea[] bytea[]

驱动程序提供了一个用于访问 COPY 的扩展。Copy 是 PostreSQL 提供的扩展。参见 Copy


/*
* DDL for code below
* create table copytest (stringvalue text, intvalue int, numvalue numeric(5,2));
*/
private static String[] origData =
            {"First Row\t1\t1.10\n",
                    "Second Row\t2\t-22.20\n",
                    "\\N\t\\N\t\\N\n",
                    "\t4\t444.40\n"};
private int dataRows = origData.length;
private String sql = "COPY copytest FROM STDIN";

try (Connection con = DriverManager.getConnection(url, "postgres", "somepassword")){
    PGConnection pgConnection = con.unwrap(org.postgresql.PGConnection.class);
    CopyManager copyAPI = pgConnection.getCopyAPI();
    CopyIn cp = copyAPI.copyIn(sql);

    for (String anOrigData : origData) {
        byte[] buf = anOrigData.getBytes();
        cp.writeToCopy(buf, 0, buf.length);
    }

    long updatedRows = cp.endCopy();
    long handledRowCount = cp.getHandledRowCount();
    System.err.println(String.format("copy Updated %d Rows, and handled %d rows", updatedRows, handledRowCount));

    int rowCount = getCount(con);
    System.err.println(rowCount);

}
String sql = "COPY copytest TO STDOUT";
try (Connection con = DriverManager.getConnection(url, "postgres", "somepassword")){
    PGConnection pgConnection = con.unwrap(org.postgresql.PGConnection.class);
    CopyManager copyAPI = pgConnection.getCopyAPI();
    CopyOut cp = copyAPI.copyOut(sql);
    int count = 0;
    byte[] buf;  // This is a relatively simple example. buf will contain rows from the database

    while ((buf = cp.readFromCopy()) != null) {
        count++;
    }
    long rowCount = cp.getHandledRowCount();
}

更多示例可以在 复制测试代码中找到