Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug: Custom Serialization/Deserialization logic in Redis doesn't work unless UTF-8 serializable #2061

Open
chrisgoddard opened this issue Feb 5, 2025 · 1 comment
Labels
bug Something isn't working Redis Issues related to `faststream.redis` module and Redis features

Comments

@chrisgoddard
Copy link

Describe the bug
I am working on a middleware to add msgpack serialization to all messages (using ormsgpack which natively handles Pydantic models).

The issue seems to be that Faststream JSON-serializes messages along with header here:

# faststream/redis/parser.py
class RawMessage:
# line: 85
    @classmethod
    def encode(
        cls,
        *,
        message: Union[Sequence["SendableMessage"], "SendableMessage"],
        reply_to: Optional[str],
        headers: Optional["AnyDict"],
        correlation_id: str,
    ) -> bytes:
        msg = cls.build(
            message=message,
            reply_to=reply_to,
            headers=headers,
            correlation_id=correlation_id,
        )

        return dump_json(
            {
                "data": msg.data,
                "headers": msg.headers,
            }
        )

Technically msg.data is supposed to be able to be bytes but in practice it has to be utf-8 compatible or it raises an exception:

/.../.venv/lib/python3.12/site-packa │
│ ges/faststream/redis/publisher/producer.py:79 in publish                              │
│                                                                                       │
│    76 │   │   │   psub = self._connection.pubsub()                                    │
│    77 │   │   │   await psub.subscribe(reply_to)                                      │
│    78 │   │                                                                           │
│ ❱  79 │   │   msg = RawMessage.encode(                                                │
│    80 │   │   │   message=message,                                                    │
│    81 │   │   │   reply_to=reply_to,                                                  │
│    82 │   │   │   headers=headers,                                                    │
│                                                                                       │
│ ╭───────────────────────────────────── locals ──────────────────────────────────────╮ │
│ │        channel = None                                                             │ │
│ │ correlation_id = '2ac5886a-736d-4712-b878-448b8b041f43'                           │ │
│ │        headers = None                                                             │ │
│ │           list = 'job-queue'                                                      │ │
│ │         maxlen = None                                                             │ │
│ │        message = b'\x82\xa3url\xb1http://kelly.com/\xa4type\xa3far'               │ │
│ │           psub = None                                                             │ │
│ │  raise_timeout = False                                                            │ │
│ │       reply_to = ''                                                               │ │
│ │            rpc = False                                                            │ │
│ │    rpc_timeout = 30.0                                                             │ │
│ │           self = <faststream.redis.publisher.producer.RedisFastProducer object at │ │
│ │                  0x1137972f0>                                                     │ │
│ │         stream = None                                                             │ │
│ ╰───────────────────────────────────────────────────────────────────────────────────╯ │
│                                                                                       │
│ /.../.venv/lib/python3.12/site-packa │
│ ges/faststream/redis/parser.py:101 in encode                                          │
│                                                                                       │
│    98 │   │   │   correlation_id=correlation_id,                                      │
│    99 │   │   )                                                                       │
│   100 │   │                                                                           │
│ ❱ 101 │   │   return dump_json(                                                       │
│   102 │   │   │   {                                                                   │
│   103 │   │   │   │   "data": msg.data,                                               │
│   104 │   │   │   │   "headers": msg.headers,                                         │
│                                                                                       │
│ ╭────────────────────────────────── locals ───────────────────────────────────╮       │
│ │ correlation_id = '2ac5886a-736d-4712-b878-448b8b041f43'                     │       │
│ │        headers = None                                                       │       │
│ │        message = b'\x82\xa3url\xb1http://kelly.com/\xa4type\xa3far'         │       │
│ │            msg = <faststream.redis.parser.RawMessage object at 0x10e69c790> │       │
│ │       reply_to = ''                                                         │       │
│ ╰─────────────────────────────────────────────────────────────────────────────╯       │
│                                                                                       │
│ /.../.venv/lib/python3.12/site-packa │
│ ges/faststream/_compat.py:93 in dump_json                                             │
│                                                                                       │
│    90 │   │   return to_jsonable_python(model, **kwargs)                              │
│    91 │                                                                               │
│    92 │   def dump_json(data: Any) -> bytes:                                          │
│ ❱  93 │   │   return json_dumps(model_to_jsonable(data))                              │
│    94 │                                                                               │
│    95 │   def get_model_fields(model: Type[BaseModel]) -> Dict[str, Any]:             │
│    96 │   │   return model.model_fields                                               │
│                                                                                       │
│ ╭───────────────────────────────────── locals ─────────────────────────────────────╮  │
│ │ data = {                                                                         │  │
│ │        │   'data': b'\x82\xa3url\xb1http://kelly.com/\xa4type\xa3far',           │  │
│ │        │   'headers': {                                                          │  │
│ │        │   │   'correlation_id': '2ac5886a-736d-4712-b878-448b8b041f43'          │  │
│ │        │   }                                                                     │  │
│ │        }                                                                         │  │
│ ╰──────────────────────────────────────────────────────────────────────────────────╯  │
│                                                                                       │
│ /.../.venv/lib/python3.12/site-packa │
│ ges/faststream/_compat.py:90 in model_to_jsonable                                     │
│                                                                                       │
│    87 │   │   model: BaseModel,                                                       │
│    88 │   │   **kwargs: Any,                                                          │
│    89 │   ) -> Any:                                                                   │
│ ❱  90 │   │   return to_jsonable_python(model, **kwargs)                              │
│    91 │                                                                               │
│    92 │   def dump_json(data: Any) -> bytes:                                          │
│    93 │   │   return json_dumps(model_to_jsonable(data))                              │
│                                                                                       │
│ ╭───────────────────────────────────── locals ──────────────────────────────────────╮ │
│ │ kwargs = {}                                                                       │ │
│ │  model = {                                                                        │ │
│ │          │   'data': b'\x82\xa3url\xb1http://kelly.com/\xa4type\xa3far',          │ │
│ │          │   'headers': {                                                         │ │
│ │          │   │   'correlation_id': '2ac5886a-736d-4712-b878-448b8b041f43'         │ │
│ │          │   }                                                                    │ │
│ │          }                                                                        │ │
│ ╰───────────────────────────────────────────────────────────────────────────────────╯ │
╰───────────────────────────────────────────────────────────────────────────────────────╯
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x82 in position 0: invalid utf-8

Here's some test code:

@app.after_startup
async def startup():
    faker = Faker("en_US")

    urls = [faker.url() for _ in range(10)]

    for url in tqdm.tqdm(urls):
        obj = Request(
            url=url,
            type=faker.word(),
        )
        await broker.publish(
            ormsgpack.packb(obj, option=ormsgpack.OPT_SERIALIZE_PYDANTIC),
            list="job-queue",
        )

The error is occurring on the producer side and the only workaround I've found so far is doing a runtime monkey-patch of the RawMessage.encode method so do the msgpack serialization at the final messaging encoding phase. This complicates things on the parsing side though as it breaks the normal message parsing (i.e. in order to get message headers, correlation_id, etc)

Any suggestions? Perhaps there needs to be additional Middleware hooks for handling the final message serialization and initial message deserialization so that serialization methods that utilize non utf-8 compatible binary are supported?

The only other option I can think of is base64 encoding the binary before message serialization which would kind of defeat the space-saving purpose of using a binary format.

Related: #1255

@chrisgoddard chrisgoddard added the bug Something isn't working label Feb 5, 2025
@Lancetnik
Copy link
Member

You can pass default redis-py option right to the broker constructor to control this behavior:
https://github.com/airtai/faststream/blob/main/faststream/redis/broker/broker.py#L113-L117

@Lancetnik Lancetnik added the Redis Issues related to `faststream.redis` module and Redis features label Feb 5, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working Redis Issues related to `faststream.redis` module and Redis features
Projects
Status: No status
Development

No branches or pull requests

2 participants