Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timeout on write_item? #4

Closed
jmacglashan opened this issue Jun 18, 2020 · 3 comments
Closed

Timeout on write_item? #4

jmacglashan opened this issue Jun 18, 2020 · 3 comments
Assignees

Comments

@jmacglashan
Copy link

jmacglashan commented Jun 18, 2020

I'm exploring reverb and it looks like it's a pretty cool framework that solves a lot of problems.

One thing I'm unsure how to do, or if it's a feature that should be added, is how to timeout on a Writer.create_item call.

To give some motivation, suppose I have a remote trainer that is also hosting the reverb server, and a worker that is writing to this remove Reverb server using a Writer and its create_item method. Now suppose the trainer server crashes for some reason (or even just ends gracefully) taking down the Reverb server. It seems that right now the worker will just be indefinitely blocked on create_item, when what I want is for it to time out and die gracefully itself.

To give an example of this behavior, run the following "server" script in one shell

import reverb
import tensorflow as tf
import time


def main():
    server = reverb.Server(  # noqa
        tables=[
            reverb.Table(
                name='my_table',
                sampler=reverb.selectors.Uniform(),
                remover=reverb.selectors.Fifo(),
                max_size=1000,
                rate_limiter=reverb.rate_limiters.MinSize(1)),
        ],
        port=8000
    )

    time.sleep(10.)  # die after 10 seconds
    print('Server closing')


if __name__ == '__main__':
    main()

In another shell, run this "worker" script

import reverb
import tensorflow as tf
import time


def main():
    client = reverb.Client('localhost:8000')

    with client.writer(max_sequence_length=2) as writer:
        for i in range(1000):
            writer.append(tf.constant([1.]))
            if i > 0:
                print(f'creating item for i={i}')
                client.server_info(timeout=5)
                writer.create_item('my_table', 2, 1.0)
                print(f'wrote item for i={i}')
                time.sleep(0.5)

    print('closing')



if __name__ == '__main__':
    main()

What I find is that when the server goes down after its 10 second lifetime, you'll see the worker is blocked indefinitely on the create_item call.

So is there a way to time this out? The only work around I saw was to use a check of the server_info method first, which does have a timeout argument you can specify, and then catch on error. E.g., you can modify the worker like this:

def main():
    client = reverb.Client('localhost:8000')

    with client.writer(max_sequence_length=2) as writer:
        for i in range(1000):
            writer.append(tf.constant([1.]))
            if i > 0:
                print(f'creating item for i={i}')
                try:
                    client.server_info(timeout=5)
                except:
                    break
                writer.create_item('my_table', 2, 1.0)
                print(f'wrote item for i={i}')
                time.sleep(0.5)

    print('closing')

This seems a bit cumebrsome though: I'm not sure what overhead that method call is introducing, and it can still potentially fail if the server dies right after the server_info call, but before the write_item (even though that may be rare).

Is there a more appropriate way to handle this? Or could a timeout arg be added to the create_item method?

Thanks!

@acassirer
Copy link
Contributor

Hey,

Sorry for the slow response!

I'm afraid that there currently isn't any timeout support in the writer. We have a backlog entry for the feature but haven't gotten around to implementing it yet as this isn't much of an issue for our own experiments where failing tasks are automatically restarted (and thus the blocking makes sense as the call eventually succeeds).

I'll bump up the priority on this feature but if this is causing you a lot of pain then maybe you could try offloading the call to a threadpool? You could then apply a timeout on the future instead.

Something along these lines maybe:

import concurrent.futures
import reverb

_TIMEOUT = 10  # Seconds.

def main():
    client = reverb.Client('localhost:8000')

    executor = futures.ThreadPoolExecutor(max_workers=1)
    with client.writer(max_sequence_length=2) as writer:
      for i in range(1000):
        try:
          # In general writer.append can also block due to communication.
          executor.submit(writer.append, tf.constant([1.]).result(_TIMEOUT)
          if i > 0:
            print(f'creating item for i={i}')
            executor.submit(writer.create_item, 'my_table', 2, 1.0).result(_TIMEOUT)
            print(f'wrote item for i={i}')
            time.sleep(0.5)
         except futures.TimeoutError:
           print('closing')

@jmacglashan
Copy link
Author

Okay, thanks! That might be a reasonable work around for now

@ebrevdo
Copy link
Collaborator

ebrevdo commented Apr 12, 2021

This is fixed in the new TrajectoryWriter flush call (create_item is now asynchronous in the new API).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants