This module provides Python packages to interface with CLP Core Features through CLP's FFI (foreign function interface). At present, this library supplies built-in functions for serializing/deserializing log messages using CLP.
# Install the latest version
python3 -m pip install --upgrade clp-ffi-py
Note:
- Python 3.7 or higher is required.
- Tested on Linux, macOS and Windows.
To install an older version or download the prebuilt whl
package, check the
project homepage on PyPI here.
Tested on Python 3.7, 3.8, 3.11, 3.12, and 3.13, and it should work on any Python version >= 3.7.
The API reference for this library can be found on our docs hub.
To manually build a package for distribution, follow the steps below.
- A C++ compiler that supports C++20 and
std::span
, e.g:clang++
>= 7g++
>= 10MSVC
>= 1930 (included in Visual Studio 2022)
- python3
- python3-dev
- python3-venv
- Task >= 3.38.0
- Initialize and update yscope-dev-utils submodules:
git submodule update --init --recursive tools/yscope-dev-utils
-
Build a Python wheel incrementally:
task
The command above will generate both a
.tar.gz
and.whl
package under./build/dist/
. -
Clean up the build:
task clean
The CLP key-value pair IR stream, introduced in version 0.0.14, is a new IR stream format that enables efficient serialization of key-value pair (kv-pair) log events.
We categorize the kv-pairs of a log event into two categories:
- Auto-generated kv-pairs: KV-pairs (e.g., timestamps, log levels, other metadata) that are automatically generated by the logging library.
- User-generated kv-pairs: Custom kv-pairs (e.g., log messages).
The serialization interface requires that kv-pairs are passed as MessagePack-encoded Map objects, where keys and values are restricted to the following MessagePack types described below.
Keys must be UTF-8-encoded strings.
Values must be one of the following MessagePack-types:
- Primitives:
- Integer
- Float
- String
- Boolean
- Null
- Maps with keys and values that have the same supported types described here.
- Arrays containing a sequence of supported primitives, arrays, or maps.
MessagePack's Binary
and Extension
types are not supported.
from clp_ffi_py.ir import Serializer
from clp_ffi_py.utils import serialize_dict_to_msgpack
with open("example.clp", "wb") as ir_stream, Serializer(ir_stream) as serializer:
serializer.serialize_log_event_from_msgpack_map(
auto_gen_msgpack_map=serialize_dict_to_msgpack({"level": "INFO"}),
user_gen_msgpack_map=serialize_dict_to_msgpack({"message": "Service started."}),
)
serializer.serialize_log_event_from_msgpack_map(
auto_gen_msgpack_map=serialize_dict_to_msgpack({"level": "WARN"}),
user_gen_msgpack_map=serialize_dict_to_msgpack({"uid": 12345, "ip": "127.0.0.1"}),
)
clp_ffi_py.utils.serialize_dict_to_msgpack
can be used to serialize a Python dictionary object
into a MessagePack object.
from clp_ffi_py.ir import Deserializer, KeyValuePairLogEvent
from typing import Optional
with open("example.clp", "rb") as ir_stream:
deserializer = Deserializer(ir_stream)
while True:
log_event: Optional[KeyValuePairLogEvent] = deserializer.deserialize_log_event()
if log_event is None:
# The entire stream has been consumed
break
auto_gen_kv_pairs, user_gen_kv_pairs = log_event.to_dict()
print(auto_gen_kv_pairs)
print(user_gen_kv_pairs)
Deserializer.deserialize_log_event
can be used to read from the IR stream and outputKeyValuePairLogEvent
objects.KeyValuePairLogEvent.to_dict
can be used to convert the underlying deserialized results into Python dictionaries.
Important
The current Deserializer
does not support reading the previous IR stream format. Backward
compatibility will be added in future releases.
CLP IR Readers provide a convenient interface for CLP IR deserialization and search methods.
Important
The readers below do not support reading or searching CLP key-value pair IR streams.
- Read+deserialize any arbitrary CLP IR stream (as an instance of
IO[bytes]
). - Can be used as an iterator that returns each log event as a
LogEvent
object. - Can search target log events by giving a search query:
- Searching log events within a certain time range.
- Searching log messages that match certain wildcard queries.
- Simple wrapper around CLPIRStreamHandler that calls
open
with a given local path.
from pathlib import Path
from clp_ffi_py.ir import ClpIrFileReader
with ClpIrFileReader(Path("example.clp.zst")) as clp_reader:
for log_event in clp_reader:
# Print the log message with its timestamp properly formatted.
print(log_event.get_formatted_message())
Each log event is represented by a LogEvent
object, which offers methods to
retrieve its underlying details, such as the timestamp and the log message. For
more information, use the following code to see all the available methods and
the associated docstring.
from clp_ffi_py.ir import LogEvent
help(LogEvent)
from typing import List
from clp_ffi_py.ir import ClpIrStreamReader, LogEvent, Query, QueryBuilder
# Create a QueryBuilder object to build the search query.
query_builder: QueryBuilder = QueryBuilder()
# Create a search query that specifies a time range by UNIX epoch timestamp in
# milliseconds. It will search from 2016.Nov.28 21:00 to 2016.Nov.29 3:00.
time_range_query: Query = (
query_builder
.set_search_time_lower_bound(1480366800000) # 2016.11.28 21:00
.set_search_time_upper_bound(1480388400000) # 2016.11.29 03:00
.build()
)
# A list to store all the log events within the search time range
log_events: List[LogEvent] = []
# Open IRstream compressed log file as a binary file stream, then pass it to
# CLpIrStreamReader.
with open("example.clp.zst", "rb") as compressed_log_file:
with ClpIrStreamReader(compressed_log_file) as clp_reader:
for log_event in clp_reader.search(time_range_query):
log_events.append(log_event)
Example Code: Using Query to search log messages of certain pattern(s) specified by wildcard queries.
from pathlib import Path
from typing import List, Tuple
from clp_ffi_py.ir import ClpIrFileReader, Query, QueryBuilder
from clp_ffi_py.wildcard_query import FullStringWildcardQuery, SubstringWildcardQuery
# Create a QueryBuilder object to build the search query.
query_builder: QueryBuilder = QueryBuilder()
# Add wildcard patterns to filter log messages:
query_builder.add_wildcard_query(SubstringWildcardQuery("uid=*,status=failed"))
query_builder.add_wildcard_query(
FullStringWildcardQuery("*UID=*,Status=KILLED*", case_sensitive=True)
)
# Initialize a Query object using the builder:
wildcard_search_query: Query = query_builder.build()
# Store the log events that match the criteria in the format:
# [timestamp, message]
matched_log_messages: List[Tuple[int, str]] = []
# A convenience file reader class is also available to interact with a file that
# represents a CLP IR stream directly.
with ClpIrFileReader(Path("example.clp.zst")) as clp_reader:
for log_event in clp_reader.search(wildcard_search_query):
matched_log_messages.append((log_event.get_timestamp(), log_event.get_log_message()))
A Query
object may have both the search time range and the wildcard queries
(WildcardQuery
) specified to support more complex search scenarios.
QueryBuilder
can be used to conveniently construct Query objects. For more
details, use the following code to access the related docstring.
from clp_ffi_py.ir import Query, QueryBuilder
from clp_ffi_py import FullStringWildcardQuery, SubstringWildcardQuery, WildcardQuery
help(Query)
help(QueryBuilder)
help(WildcardQuery)
help(FullStringWildcardQuery)
help(SubstringWildcardQuery)
When working with CLP IR files stored on S3-compatible storage systems, smart_open can be used to open and read the IR stream for the following benefits:
- It only performs stream operation and does not download the file to the disk.
- It only invokes a single
GET
request so that the API access cost is minimized.
Here is an example:
from pathlib import Path
from clp_ffi_py.ir import ClpIrStreamReader
import boto3
import os
import smart_open
# Create a boto3 session by reading AWS credentials from environment variables.
session = boto3.Session(
aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'],
aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'],
)
url = 's3://clp-example-s3-bucket/example.clp.zst'
# Using `smart_open.open` to stream the CLP IR byte sequence:
with smart_open.open(
url, mode="rb", compression="disable", transport_params={"client": session.client("s3")}
) as istream:
with ClpIrStreamReader(istream, allow_incomplete_stream=True) as clp_reader:
for log_event in clp_reader:
# Print the log message with its timestamp properly formatted.
print(log_event.get_formatted_message())
Note:
- Setting
compression="disable"
is necessary so thatsmart_open
doesn't undo the IR file's Zstandard compression (based on the file's extension) before streaming it toClpIrStreamReader
;ClpIrStreamReader
expects the input stream to be Zstandard-compressed. - When
allow_incomplete_stream
is set to False (default), the reader will raiseclp_ffi_py.ir.IncompleteStreamError
if the stream is incomplete (it doesn't end with the byte sequence indicating the stream's end). In practice, this can occur if you're reading a stream that is still being written or wasn't properly closed.
The Query
and LogEvent
classes can be serialized by pickle. Therefore,
deserializing and searching can be parallelized across streams/files using libraries
such as multiprocessing and tqlm.
# 1. Create and enter a virtual environment
python -m venv venv && . ./venv/bin/activate
# 2. Install development dependencies
pip install -r requirements-dev.txt
# 3. Pull all submodules in preparation for building
git submodule update --init --recursive
# 4. Install
pip install -e .
# 5. Run unit tests
python -m unittest -bv
Note: If the package is installed from a whl
file into the site packages,
rather than installed locally (pip install -e .
), the tester cannot be
launched from the project's root directory. If unittest
is ran from the root
directory, the local clp_ffi_py
directory will shadow the clp_ffi_py
module
installed. To run the tester with the installed package, try the following:
cd tests
python -m unittest -bv
This project utilizes cibuildwheel configuration. Whenever modifications are made and committed to GitHub, the cibuildwheel Action will automatically initiate, building this library for several Python environments across diverse OS and architectures. You can access the build outcomes (wheel files) via the GitHub Action page. For instructions on customizing the build targets or running cibuildwheel locally, please refer to the official documentation of cibuildwheel.
Certain file types need to be added to our linting rules manually:
- CMake. If adding a CMake file, add it (or its parent directory) as an argument to the
gersemi
command in lint-tasks.yaml.- If adding a directory, the file must be named
CMakeLists.txt
or use the.cmake
extension.
- If adding a directory, the file must be named
- YAML. If adding a YAML file (regardless of its extension), add it as an argument to the
yamllint
command in lint-tasks.yaml.
Before submitting a pull request, ensure you’ve run the linting commands below and either fixed any violations or suppressed the warning.
To run all linting checks:
task lint:check
To run all linting checks AND automatically fix any fixable issues:
task lint:fix
The commands above run all linting checks, but for performance you may want to run a subset (e.g., if you only changed C++ files, you don't need to run the YAML linting checks) using one of the tasks in the table below.
Task | Description |
---|---|
lint:cmake-check |
Runs the CMake linters. |
lint:cmake-fix |
Runs the CMake linters and fixes any violations. |
lint:cpp-check |
Runs the C++ linters (formatters and static analyzers). |
lint:cpp-fix |
Runs the C++ linters and fixes some violations. |
lint:cpp-format-check |
Runs the C++ formatters. |
lint:cpp-format-fix |
Runs the C++ formatters and fixes some violations. |
lint:cpp-static-check |
Runs the C++ static analyzers. |
lint:py-check |
Runs the Python linters. |
lint:py-fix |
Runs the Python linters and fixes some violations. |
lint:yml-check |
Runs the YAML linters. |
lint:yml-fix |
Runs the YAML linters and fixes some violations. |