Generate and load ElasticSearch indexes based on Table Schema descriptors.
- implements
tableschema.Storage
interface
The package use semantic versioning. It means that major versions could include breaking changes. It's highly recommended to specify package
version range in your setup/requirements
file e.g. package>=1.0,<2.0
.
pip install tableschema-elasticsearch
import elasticsearch
import jsontableschema_es
INDEX_NAME = 'testing_index'
# Connect to Elasticsearch instance running on localhost
es=elasticsearch.Elasticsearch()
storage=jsontableschema_es.Storage(es)
# List all indexes
print(list(storage.buckets))
# Create a new index
storage.create('test', {
'fields': [
{
'name': 'num',
'type': 'number'
}
]
}
)
# Write data to index
l=list(storage.write(INDEX_NAME, ({'num':i} for i in range(1000)), ['num']))
print(len(l))
print(l[:10], '...')
l=list(storage.write(INDEX_NAME, ({'num':i} for i in range(500,1500)), ['num']))
print(len(l))
print(l[:10], '...')
# Read all data from index
storage=jsontableschema_es.Storage(es)
print(list(storage.buckets))
l=list(storage.read(INDEX_NAME))
print(len(l))
print(l[:10])
In this driver elasticsearch
is used as the db wrapper. We can get storage this way:
from elasticsearch import Elasticsearch
from jsontableschema_elasticsearch import Storage
engine = Elasticsearch()
storage = Storage(engine)
Then we could interact with storage ('buckets' are ElasticSearch indexes in this context):
storage.buckets # iterator over bucket names
storage.create('bucket', descriptor,
reindex=False,
always_recreate=False,
mapping_generator_cls=None)
# reindex will copy existing documents from an existing index with the same name (in case of a mapping conflict)
# always_recreate will always recreate an index, even if it already exists. default is to update mappings only.
# mapping_generator_cls allows customization of the generated mapping
storage.delete('bucket')
storage.describe('bucket') # return descriptor, not implemented yet
storage.iter('bucket') # yield rows
storage.read('bucket') # return rows
storage.write('bucket', rows, primary_key,
as_generator=False)
# primary_key is a list of field names which will be used to generate document ids
When creating indexes, we always create an index with a semi-random name and a matching alias that points to it. This allows us to decide whether to re-index documents whenever we're re-creating an index, or to discard the existing records.
When creating indexes, the tableschema types are converted to ES types and a mapping is generated for the index.
Some special properties in the schema provide extra information for generating the mapping:
array
types need also to have thees:itemType
property which specifies the inner data type of array items.object
types need also to have thees:schema
property which provides a tableschema for the inner document contained in that object (or havees:enabled=false
to disable indexing of that field).
Example:
{
"fields": [
{
"name": "my-number",
"type": "number"
},
{
"name": "my-array-of-dates",
"type": "array",
"es:itemType": "date"
},
{
"name": "my-person-object",
"type": "object",
"es:schema": {
"fields": [
{"name": "name", "type": "string"},
{"name": "surname", "type": "string"},
{"name": "age", "type": "integer"},
{"name": "date-of-birth", "type": "date", "format": "%Y-%m-%d"}
]
}
},
{
"name": "my-library",
"type": "array",
"es:itemType": "object",
"es:schema": {
"fields": [
{"name": "title", "type": "string"},
{"name": "isbn", "type": "string"},
{"name": "num-of-pages", "type": "integer"}
]
}
},
{
"name": "my-user-provded-object",
"type": "object",
"es:enabled": false
}
]
}
By providing a custom mapping generator class (via mapping_generator_cls
), inheriting from the MappingGenerator class you should be able
Storage(self, es=None)
Elasticsearch Tabular Storage.
Package implements Tabular Storage interface (see full documentation on the link):
Only additional API is documented
Arguments
- es (object): ElasticSearch instance
storage.create(self, bucket, descriptor, reindex=False, always_recreate=False, mapping_generator_cls=None, index_settings=None)
Create index with mapping by schema.
Arguments
- bucket(str): Name of index to be created
- descriptor: dDscriptor of index to be created
- always_recreate: Delete index if already exists (otherwise just update mapping)
- reindex: On mapping mismath, automatically create new index and migrate existing indexes to it
- mapping_generator_cls: subclass of MappingGenerator
- index_settings: settings which will be used in index creation
storage.delete(self, bucket=None)
Delete index with mapping by schema.
Arguments
- bucket(str): Name of index to delete
The project follows the Open Knowledge International coding standards.
Recommended way to get started is to create and activate a project virtual environment. To install package and development dependencies into active environment:
$ make install
To run tests with linting and coverage:
$ make test
Here described only breaking and the most important changes. The full changelog and documentation for all released versions could be found in nicely formatted commit history.
- Initial driver implementation