Skip to content

opencollector/td-client-python

 
 

Repository files navigation

Treasure Data API library for Python

Build Status Build status Coverage Status PyPI version

Treasure Data API library for Python

Requirements

td-client supports the following versions of Python.

  • Python 3.5+
  • PyPy

Install

You can install the releases from PyPI.

$ pip install td-client

It'd be better to install certifi to enable SSL certificate verification.

$ pip install certifi

Examples

Please see also the examples at Treasure Data Documentation.

If you want to find API reference, see also API document.

Listing jobs

Treasure Data API key will be read from environment variable TD_API_KEY, if none is given via apikey= argument passed to tdclient.Client.

Treasure Data API endpoint https://api.treasuredata.com is used by default. You can override this with environment variable TD_API_SERVER, which in turn can be overridden via endpoint= argument passed to tdclient.Client. List of available Treasure Data sites and corresponding API endpoints can be found here.

import tdclient

with tdclient.Client() as td:
    for job in td.jobs():
        print(job.job_id)

Running jobs

Running jobs on Treasure Data.

import tdclient

with tdclient.Client() as td:
    job = td.query("sample_datasets", "SELECT COUNT(1) FROM www_access", type="hive")
    job.wait()
    for row in job.result():
        print(repr(row))

Running jobs via DBAPI2

td-client-python implements PEP 0249 Python Database API v2.0. You can use td-client-python with external libraries which supports Database API such like pandas.

import pandas
import tdclient

def on_waiting(cursor):
    print(cursor.job_status())

with tdclient.connect(db="sample_datasets", type="presto", wait_callback=on_waiting) as td:
    data = pandas.read_sql("SELECT symbol, COUNT(1) AS c FROM nasdaq GROUP BY symbol", td)
    print(repr(data))

We offer another package for pandas named pytd with some advanced features. You may prefer it if you need to do complicated things, such like exporting result data to Treasure Data, printing job's progress during long execution, etc.

Importing data

Importing data into Treasure Data in streaming manner, as similar as fluentd is doing.

import sys
import tdclient

with tdclient.Client() as td:
    for file_name in sys.argv[:1]:
        td.import_file("mydb", "mytbl", "csv", file_name)

Warning

Importing data in streaming manner requires certain amount of time to be ready to query since schema update will be executed with delay.

Bulk import

Importing data into Treasure Data in batch manner.

import sys
import tdclient
import uuid
import warnings

if len(sys.argv) <= 1:
    sys.exit(0)

with tdclient.Client() as td:
    session_name = "session-{}".format(uuid.uuid1())
    bulk_import = td.create_bulk_import(session_name, "mydb", "mytbl")
    try:
        for file_name in sys.argv[1:]:
            part_name = "part-{}".format{file_name}
            bulk_import.upload_file(part_name, "json", file_name)
        bulk_import.freeze()
    except:
        bulk_import.delete()
        raise
    bulk_import.perform(wait=True)
    if 0 < bulk_import.error_records:
        warnings.warn("detected {} error records.".format(bulk_import.error_records))
    if 0 < bulk_import.valid_records:
        print("imported {} records.".format(bulk_import.valid_records))
    else:
        raise(RuntimeError("no records have been imported: {}".format(bulk_import.name)))
    bulk_import.commit(wait=True)
    bulk_import.delete()

If you want to import data as msgpack format, you can write as follows:

import io
import time
import uuid
import warnings

import tdclient

t1 = int(time.time())
l1 = [{"a": 1, "b": 2, "time": t1}, {"a": 3, "b": 9, "time": t1}]

with tdclient.Client() as td:
    session_name = "session-{}".format(uuid.uuid1())
    bulk_import = td.create_bulk_import(session_name, "mydb", "mytbl")
    try:
        _bytes = tdclient.util.create_msgpack(l1)
        bulk_import.upload_file("part", "msgpack", io.BytesIO(_bytes))
        bulk_import.freeze()
    except:
        bulk_import.delete()
        raise
    bulk_import.perform(wait=True)
    # same as the above example

Development

Running tests

Run tests.

$ python setup.py test

Running tests (tox)

You can run tests against all supported Python versions. I'd recommend you to install pyenv to manage Pythons.

$ pyenv shell system
$ for version in $(cat .python-version); do [ -d "$(pyenv root)/versions/${version}" ] || pyenv install "${version}"; done
$ pyenv shell --unset

Install tox.

$ pip install tox

Then, run tox.

$ tox

Release

Release to PyPI. Ensure you installed twine.

$ python setup.py bdist_wheel sdist
$ twine upload dist/*

License

Apache Software License, Version 2.0

About

Treasure Data API library for Python

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Python 99.7%
  • Batchfile 0.3%