Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pgpubsub falls far behind after a few hours #19

Closed
metalaureate opened this issue Jan 19, 2023 · 8 comments
Closed

pgpubsub falls far behind after a few hours #19

metalaureate opened this issue Jan 19, 2023 · 8 comments

Comments

@metalaureate
Copy link

Thanks again for your package. I am currently using it to emit websocket messages on db updates for a news feed, but I notice it falls behind db updates by up to 20 to 30 minutes over several hrs (e.g. if I stop the feed row insertsion, the listener takes an additional 20-30 minutes to clear its backlog). It takes less than 100ms to process the listener handler, but over many hours the notifications fall behind. When I terminate the process, I get a very odd output showing a dozen or more processes terminating, not just the the usual manage.py listen process, which makes me wonder what is going on. Sorry not much information--happy to try to give you more information. I've added some extra logging to try to see what's causing it.

@metalaureate
Copy link
Author

Something maybe related: this issue started to happen after I added a listener on a new model which has a foreign key name that I defined to avoid db column conflicts.

Encountered exception HomeFeed() got an unexpected keyword argument 'collection_fk'

This is defined as

collection_ref=models.ForeignKey(Collection, on_delete=models.CASCADE, db_column='collection_fk',null=True,blank=True)

Is it possible that pgpubsub is not handling that custom db column name, producing an exception, that re-generates the process but does not kill the old process, gradually over time accumulating duplicate processes that explain my slow down?

I am going to refactor my model to avoid this foreign key rename to see if that makes the problem go away.

@PaulGilmartin
Copy link
Owner

@metalaureate That's a strange one. I don't have time right now to do a full investigation, but here's a couple of things I'm wondering about.

  1. Where did the exception Encountered exception HomeFeed() got an unexpected keyword argument 'collection_fk' actually come from? What is HomeFeed here? Was this an error raised by pgpubsub? A minimal example of your pgpubsub related objects/code would be helpful here.

  2. Regarding your comment about exceptions leaving extra processes hanging around - this is the source code for the listen function:

    channels: Union[List[BaseChannel], List[str]]=None,
    recover: bool=False,
    poll_count: Union[None, int]=None,
):
    multiprocessing.set_start_method('fork', force=True)
    pg_connection = listen_to_channels(channels)
    if recover:
        process_stored_notifications(channels)
        process_notifications(pg_connection)
    poll_count = poll_count or float('inf')
    print('Listening for notifications...')
    while poll_count:
        if select.select([pg_connection], [], [], 1) == ([], [], []):
            pass
        else:
            try:
                process_notifications(pg_connection)
            except Exception as e:
                print(f'Encountered exception {e}')
                print('Restarting process')
                process = multiprocessing.Process(
                    target=listen, args=(channels,))
                process.start()
                raise
        poll_count -= 1

Notice that when any exception occurs when processing the Notification, we spin up a new process and then raise the exception. I believe the raise should be enough to kill the process in which the exception originally occurred? So I'm not sure how we could end up with processes being left lying around like that. Any thoughts on this?

@metalaureate
Copy link
Author

metalaureate commented Jan 19, 2023

Thank you. I removed the Model with the foreign key and the problem has gone away. There's no stack trace for the foreign key error, but it is being raised by the listeners.py file which only contains a handler to relay the insert.

I'm closing the issue since I can't reproduce it without the custom db_column name on the foreign key and I can work around that.

@pgpubsub.post_insert_listener(ComHomeFeedChannel)
def emit_homefeed_change(old: HomeFeed, new: HomeFeed):
    relayMessage('homefeed', 'ACTIVITY_UPDATE', {
        'collection_id': new.collection_id,
        'event_type': new.event_type,
        'image_url': new.image_url,
        'prev_val': new.prev_val,
        'cur_val': new.cur_val,
        'delta': new.delta,
        'pc_change': new.pc_change,
        'metadata': new.metadata
    })
    ```

@PaulGilmartin
Copy link
Owner

@metalaureate Hmm ok. I am now slightly worried that something isn't quite working between exception handling and closing processes, but I'm not sure

@PaulGilmartin
Copy link
Owner

@metalaureate I'm able to recreate the issue of an exception being raised when the ForeignKey is given a custom db_column value. I've started a new issue here for that: #20

One possible explanation of the performance degradation you experienced is that, when an exception occurs, the stored Notification object is not deleted (by design, so the message can be picked up later). If every single message failed due to this exception, the Notification table would quickly grow in size, and thus scans of the table would become slower. This doesn't explain the extra processes you saw, however. I'm not sure what could be causing that.

@metalaureate
Copy link
Author

Amazing you reproduced it! 💪

In my scenario the exceptions were infrequent and seemed to account for a total of about a dozen parallel processes (when I ctrl + C at terminal after this had been going on, I would get a Process terminated error about a dozen times. I wish I had posted that output). I wonder if it is an artifact of my Terminal environment (PyCharm, Mac M1) or my Mac sleeping and waking up. 🤷 Sorry not very helpful.

@PaulGilmartin
Copy link
Owner

By means of associating a unique identifier to each process, I did indeed confirm that when an exception is raised, the process in which the exception occurred is safely closed whilst a child process is spun open. So I think we can rule that possibility out as an explanation at least.

@PaulGilmartin
Copy link
Owner

Just in case you were interested, I explain here what's causing the bug you mentioned with the foreign key

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants