PipelineDB supports ingesting data from Kafka topics into streams. All of this functionality is contained in the pipeline_kafka extension. Internally, pipeline_kafka uses PostgreSQL's COPY infrastructure to transform Kafka messages into rows that PipelineDB understands.
Note
The pipeline_kafka extension is officially supported but does not ship with the PipelineDB packages and therefore must be installed separately. The repository for the extension is located here. Instructions for building and installing the extension can be found in the README.md
file.
pipeline_kafka internally uses shared memory to sync state between background workers, so it must be loaded as a shared library. You can do so by adding the following line to your pipelinedb.conf
file. If you're already loading some shared libraries, then simply add pipeline_kafka
as a comma-separated list.
shared_preload_libraries = pipeline_kafka
You can now load the extention into a database:
postgres=# CREATE EXTENSION pipeline_kafka;
CREATE EXTENSION
Before you can start using pipeline_kafka, you must add a broker for your Kafka deployment.
pipeline_kafka.add_broker ( hostname text )
hostname is a string of the form <host>[:<port>]
. Multiple brokers can be added by calling pipeline_kafka.add_broker for each host.
pipeline_kafka.consume_begin ( topic text, stream text, format := 'text', delimiter := E'\t', quote := NULL, escape := NULL, batchsize := 1000, maxbytes := 32000000, parallelism := 1, start_offset := NULL )
Launches parallelism number of background worker processes that each reads messages from the given Kafka topic into the given stream. The target stream must be created with CREATE STREAM
beforehand. All partitions of the given topic will be spread evenly across each worker process.
The optional format, delimiter, escape and quote arguments are analagous to the FORMAT
, DELIMITER
ESCAPE
and QUOTE
options for the PostgreSQL COPY command, except that pipeline_kafka supports one additional format: json. The json format interprets each Kafka message as a JSON object.
batchsize controls the batch_size
parameter passed to the Kafka client. We force a COPY
and commit cycle after batchsize
messages have been buffered.
maxbytes controls the fetch.message.max.bytes
parameter passes to the Kafka client. We force a COPY
and commit cycle after maxbytes
data has been buffered.
start_offset specifies the offset from which to start reading the Kafka topic partitions.
pipeline_kafka continuously saves the offset its read till durably in the database. If start_offset is NULL
, we start from the saved offset or the end of the partition if there is no saved offset. A start_offset of -1 will start reading end of each partition and -2 will start consuming from the beginning of each partition. Using any other start_offset would be an odd thing to do, since offsets are unrelated among partitions.
pipeline_kafka.consume_begin ( )
Same as above, but launches all previously created consumers instead of for a specific stream-topic pair.
pipeline_kafka.consume_end ( topic text, stream text )
Terminates background consumer processes for the given stream-topic pair.
pipeline_kafka.consume_end ( )
Same as above, but terminates all consumer processes.
.. versionadded:: 0.9.1
pipeline_kafka.produce_message ( topic text, message bytea, partition := NULL, key := NULL )
Produces a single message into the target topic. Both partition and key are optional. By default the parition remains unassigned so the broker will decide which parition to produce the message to depending on the topic's paritioner function. If you want to produce the message into a specific partition, specify it as an integer
. key is a bytea
argument which will be used as the key to the partition function.
pipeline_kafka.emit_tuple ( topic, partition, key )
This is a trigger function that can be used to emit tuples into a Kafka stream in JSON format. It can only be used for a AFTER INSERT OR UPDATE
and FOR EACH ROW
trigger. In case of an UPDATE
, the new updated tuples is emitted. A topic must be provided, where as partition and key are both optional. Since this is a trigger function, all arguments must be passed as string literals and there is no way to specify keyword arguments. If you only want to specify a topic and key, use '-1'
as the parition which will keep the partition unassigned. key is the name of the column in the tuple being emitted whose value should be used as the parition key.
pipeline_kafka uses several tables to durably keep track of its own state across system restarts:
pipeline_kafka.consumers
Stores the metadata for each stream-topic consumer that is created by pipeline_kafka.consume_begin.
pipeline_kafka.brokers
Stores all Kafka brokers that consumers can connect to.
pipeline_kafka.offsets
Stores Kafka topic offsets so that consumers can begin reading messages from where they left off before termination or system restarts.
Note
See SQL on Kafka for an in-depth tutorial on using Kafka with PipelineDB.
PipelineDB also supports ingesting data from Amazon Kinesis streams. This functionality is provided by the pipeline_kinesis extension. Internally, the extension manages bgworkers that are consuming data using the AWS SDK, and copying it into pipeline streams.
The repository for the extension is located here. Instructions for building and installing the extension can be found in the README.md
file.
To enable the extension, it must be explicitly loaded:
postgres=# CREATE EXTENSION pipeline_kinesis;
CREATE EXTENSION
To start ingestion, you must first tell pipeline where and how to get kinesis data by configuring an endpoint:
pipeline_kinesis.add_endpoint( name text, region text, credfile text := NULL, url text := NULL )
name is a unique identifier for the endpoint. region is a string identifying the AWS region, e.g. us-east-1
or us-west-2
.
credfile is an optional parameter that allows overriding the default file location for AWS credentials.
url is an optional parameter that allows the use a different (non-AWS) kinesis server. This is mostly useful for testing with local kinesis servers such as kinesalite.
pipeline_kinesis.consume_begin ( endpoint text, stream text, relation text, format text := 'text', delimiter text := E'\t', quote text := NULL, escape text := NULL, batchsize int := 1000, parallelism int := 1, start_offset int := NULL )
Starts a logical consumer group that consumes kinesis messages from kinesis stream at endpoint and copies them into the pipeline stream relation.
parallelism is used to specify the number of background worker processes that should be used per consumer to balance load. Note - this does not need to be set to the number of shards, since the extension is internally threaded. The default value of 1 is sufficient unless the consumer starts to fall behind.
format, delimiter, escape and quote are optional parameters used to control the format of the copied rows, as in PostgreSQL COPY.
batchsize is passed on to the AWS SDK and controls the Limit
parameter used in Kinesis GetRecords.
start_offset is used to control the stream position that the extension starts reading from. -1 is used to start reading from the end of the stream, and -2 to read from the start. Internally, these map to TRIM_HORIZON
and LATEST
. See Kinesis GetShardIterator for more details.
pipeline_kinesis.consume_end (endpoint text, stream text, relation text)
Terminates all background worker process for a particular consumer.
pipeline_kinesis.consume_begin()
Launches all previously created consumers.
pipeline_kinesis.consume_end()
Terminates all background worker processes for all previously started consumers.
pipeline_kinesis uses several tables to durably keep track of its own state across system restarts:
pipeline_kinesis.endpoints
Stores the metadata for each endpoint that is created by kinesis_add_endpoint
pipeline_kinsesis.consumers
Stores the metadata for each consumer that is created by kinesis_consume_begin.
pipeline_kinsesis.seqnums
Stores the per-shard metadata for each consumer. Namely, seqnums.