Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kinesis connector #474

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
<module>presto-jmx</module>
<module>presto-record-decoder</module>
<module>presto-kafka</module>
<module>presto-kinesis</module>
<module>presto-redis</module>
<module>presto-accumulo</module>
<module>presto-cassandra</module>
Expand Down
43 changes: 43 additions & 0 deletions presto-kinesis/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Kinesis Connector

Kinesis is Amazon’s fully managed cloud-based service for real-time processing
of large, distributed data streams.

This connector allows the use of Kinesis streams as tables in Presto, such that each data-blob (message)
in a kinesis stream is presented as a row in Presto. A flexible table mapping approach lets us
treat fields of the messages as columns in the table.

Under the hood, a Kinesis shard iterator is used to retrieve the records, along with
a series of getRecords calls. The shard iterator starts by default 24 hours before
the current time and works its way forward (this is configurable now, so it can
changed as needed). To be able to query a stream, table mappings are needed as
explained in [Table Definition] (https://github.com/stitchfix/presto-kinesis/wiki/Table-Definitions).
These table definitions can be stored on Amazon S3 (preferred) or stored in
a local directory on each Presto node.

This version has been updated to work with Presto 0.149 and the latest
version of the Amazon AWS SDKs. Other improvements and bug fixes have also been made.

> This connector is Read-Only connector. It can only fetch data from
kinesis streams, but can not create streams or push data into the al
ready existing streams.


# Building

mvn clean package

This will create ``target/presto-kinesis-<version>-bundle.tar.gz``
file which contains the connector code and its dependency jars.

# Installation

You will need to augment your presto installation on coordinator and worker nodes to make sure the connector is loaded and configured properly. We will use $PRESTO_HOME to refer to the presto installation directory.

* Create a ``kinesis.properties`` file in ``$PRESTO_HOME/etc/catalog`` directory. See [Connector Configuration] (https://github.com/stitchfix/presto-kinesis/wiki/Connector-Configuration)
* Create a json table definition file for every presto-kinesis table. See [Table Definition] (https://github.com/stitchfix/presto-kinesis/wiki/Table-Definitions). These can be added to S3 or a local directory (by default ``$PRESTO_HOME/etc/kinesis``).
* Copy contents of the tarred file to ``$PRESTO_HOME/plugin/presto-kinesis`` (create it if necessary)
* Restart the Presto server to make the changes take effect

This distribution contains a sample kinesis.properties file that can be used as a starting point (in etc/catalog).
Comments above each property summarize what each property is for.
62 changes: 62 additions & 0 deletions presto-kinesis/etc/catalog/kinesis.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Configuration for the kinesis connector
#
# This is a sample configuration file for the Kinesis connector.
# The values given here are the defaults.

# Connector name, usually keep this as kinesis
connector.name=kinesis

# kinesis.default-schema
kinesis.default-schema=default

# directory of table definitions (defaults to etc/kinesis)
#kinesis.table-description-dir=etc/kinesis/

# S3 bucket and directory and URL format. This is an alternative to the
## local directory, and overrides it
#kinesis.table-descriptions-s3=s3://bucket.name/directory/path

# AWS credentials. Omit these to use the instance provider credentials
#kinesis.access-key=<your-amazon-access-key>
#kinesis.secret-key=<your-amazon-secret-key>

# region (default is us-east-1)
kinesis.aws-region=us-east-1

# expose or hide internal columns (recommend false which is the default)
kinesis.hide-internal-columns=false

# Maximum number of records to return. Maximum Limit 10000
kinesis.batch-size=10000

# Maximum number of batches to read in a single query
kinesis.max-batches=100

# Maximum number of fetches to try when no records returned
kinesis.fetch-attempts=2

# Sleep duration between calls to getRecords in Kinesis (default 1000ms)
kinesis.sleep-time=1000ms

# Use an initial shard iterator type of AT_TIMESTAMP starting
# iterOffsetSeconds before the current time
kinesis.iter-from-timestamp=true

# When iterFromTimestamp is true, the shard iterator will start at
## iterOffsetSeconds before the current time
kinesis.iter-offset-seconds=86400

# info log one line each time a Kinesis batch is read with some useful info
#kinesis.log-batches=true

# enable query checkpointing via Dynamo DB (configure other
# properties below if needed)
kinesis.checkpoint-enabled=false

# properties related to checkpointing with Dynamo DB
#kinesis.dynamo-read-capacity=50
#kinesis.dynamo-write-capacity=10
#kinesis.checkpoint-interval-ms=60000ms
#kinesis.checkpoint-logical-name=process1
#kinesis.iteration-number=0

38 changes: 38 additions & 0 deletions presto-kinesis/etc/kinesis/testtable.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
"tableName": "test_table",
"schemaName": "prod",
"streamName": "test_kinesis_stream",
"message": {
"dataFormat": "json",
"fields": [
{
"name": "client_id",
"type": "BIGINT",
"mapping": "client_id",
"comment": "The client ID field"
},
{
"name": "acct_balance",
"type": "DOUBLE",
"mapping": "acct_balance",
"comment": "Current account balance"
},
{
"name": "service_type",
"mapping": "service_type",
"type": "VARCHAR(20)"
},
{
"name": "signup_date",
"mapping": "signup_date",
"type": "DATE",
"dataFormat": "iso8601"
}
]
},
"comment" : "This test adds some extra fields to make sure they are ignored and don't cause issues.",
"client_metadata" : {
"name" : "Sample Query",
"query" : "select client_id, service_type, signup_date, _shard_id, _message_length from prod.test_table"
}
}
Loading