Skip to content
This repository has been archived by the owner on Sep 14, 2020. It is now read-only.
/ pynesis Public archive
forked from patoroco/pynesis

High level python library for using kinesis streams

License

Notifications You must be signed in to change notification settings

syapse/pynesis

 
 

Repository files navigation

pynesis CircleCI PyPI version Python Versions

High level kinesis client. Support python 2.7 and 3.6, and has helpers for using it within Django.

Some features:

  • Supports python 2 & 3
  • Django helpers included
  • Automatically detects shard count changes
  • Checkpoints/sequences persistence can be customized
  • Provided Checkpointer implementations for memory, django model and redis
  • Provided Dummy kinesis implementation for development/testing

Some limitations:

  • Single threaded/sequential. It will read from all shards in a round-robin fashion

Installation

Install the latest stable version from Pypi with pip install pynesis

Usage

from pynesis.streams import KinesisStream

# Get a reference to the stream we want to work with
stream = KinesisStream("my-stream", region_name="eu-west-2")

# Write to the stream
stream.put("key", "my message".encode("utf-8"))

# Read from the stream
for record in stream.read():
    print(record)

Now persisting the sequences:

from pynesis.streams import KinesisStream
from pynesis.checkpointers import RedisCheckpointer

checkpointer = RedisCheckpointer(redis_host="localhost")
stream = KinesisStream("my-stream", region_name="eu-west-2", checkpointer=checkpointer)

for record in stream.read():
    print(record)

stream.read() returns a KinesisRecord on each iteration which has the following instance attributes for accessing the details of the raw record:

  • sequence_number
  • approximate_arrival_timestamp
  • data
  • partition_key

See the examples available here for more details

Django support

If you are using django, you can configure your kinesis streams in the standard django settings.py file, see here for an example.

You can use also the provided Django model based Checkpointer if you want to save streams sequences into the database instead of redis, for using it, just add pynesis to INSTALLED_APPS and run the provided migration with manage.py migrate.

Development environment

Run make shell and then use the tests/examples.

Running tests

To run all tests in all environments and python versions supported, run:

make test

To run a single test in a single environment, from within a make shell run:

tox -e py36-dj19-redis -- pynesis/tests/examples_tests.py::pynesis/tests/example_test.py::test_simple_reading_example

About

High level python library for using kinesis streams

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Python 92.0%
  • Shell 5.1%
  • Makefile 2.9%