Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How can i change mini-batch size when i using tfio.experimental.streaming.KafkaBatchIODataset api? #1458

Closed
TTian-vivo opened this issue Jun 16, 2021 · 12 comments

Comments

@TTian-vivo
Copy link

How can i change mini-batch size when i using tfio.experimental.streaming.KafkaBatchIODataset api? I increase the configuration params tenfold (batch.size、batch.num.messages、message.max.bytes) simultaneously,but the mini batch size is still 1024.
ps:tensorflow version 2.5.0; tensorflow-io version 0.16.0

@kvignesh1420
Copy link
Member

@TTian-vivo as of now it is restricted to 1024. Let me change it to take the value of max.poll.records configuration option for this value. Will that be fine?

@TTian-vivo
Copy link
Author

ok, thanks, I think this value (mini batch size) need to be adjustable;

@kvignesh1420
Copy link
Member

@TTian-vivo I have raised a PR (#1460) to address this. This enables you to pass:

batch.num.messages=xxxx

to the configuration options and your max size for mini batch/mini dataset will be equal to xxxx while streaming.

@TTian-vivo
Copy link
Author

ok, thanks. And which configuration can config the smallest size for mini batch? I think increase the batch size can speed ​​up model training and easier model convergence. Don't know if my idea is correct?

@kvignesh1420
Copy link
Member

@TTian-vivo you can set the batch.num.messages to an estimate (let's say 2048 based on network connectivity with kafka) and then train your model in the following manner:

import tensorflow_io as tfio

# Prepare the dataset
dataset = tfio.experimental.streaming.KafkaBatchIODataset(
    topics=["mini-batch-test"],
    group_id="cgminibatchtrain",
    servers=None,
    stream_timeout=5000,
    configuration=[
        "session.timeout.ms=7000",
        "max.poll.interval.ms=8000",
        "auto.offset.reset=earliest",
       "batch.num.messages=2048"
    ],
)

# Prepare the model
NUM_COLUMNS = 1
model = tf.keras.Sequential(
    [
        tf.keras.layers.Input(shape=(NUM_COLUMNS,)),
        tf.keras.layers.Dense(4, activation="relu"),
        tf.keras.layers.Dropout(0.1),
        tf.keras.layers.Dense(1, activation="sigmoid"),
    ]
)
model.compile(
    optimizer="adam",
    loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
    metrics=["accuracy"],
)

# Train the model
TRAINING_BATCH_SIZE = 64 
for mini_d in dataset:
    mini_d = mini_d.map(
        lambda m, k: (
            tf.strings.to_number(m, out_type=tf.float32),
            tf.strings.to_number(k, out_type=tf.float32),
        )
    ).batch(TRAINING_BATCH_SIZE)
    # Fits the model as long as the data keeps on streaming
    model.fit(mini_d, epochs=5)

In the training loop, every mini_d is a tf.data.Dataset which will be of size (batch.num.messages i.e 2048). This size is only relevant for fetching data from kafka in chunks. However, for training your model, you will have to use mini_d.batch() as shown above. As TRAINING_BATCH_SIZE is a hyper-parameter, you might have to fine-tune for the optimal value.

@TTian-vivo
Copy link
Author

TTian-vivo commented Jun 17, 2021

Sorry you didn't understand what I mean.
I want the TRAINING_BATCH_SIZE=2048 Or other values ​​greater than 1024 (in your example TRAINING_BATCH_SIZE = 64 ). Even batch.num.messages=2048 the size of mini_d is still 1024.

@kvignesh1420
Copy link
Member

@TTian-vivo the PR has not been merged yet. Will try to get it merged soon.

@TTian-vivo
Copy link
Author

Ok,look forward to this.

@kvignesh1420
Copy link
Member

@TTian-vivo the PR has been merged, you can install the tensorflow-io-nightly package via pip and give it a try. Let me know.

@TTian-vivo
Copy link
Author

what is the differenct between tensorflow-io-nightly and tensorflow-io package? I tried tensorflow-io package, the problem is solved.

@kvignesh1420
Copy link
Member

@TTian-vivo the tensorflow-io package indicates the latest stable release (i.e 0.19.0 as of now) of tensorflow-io. Whereas, the tensorflow-io-nightly package is the up-to-date packaged version of the master branch of tensorflow-io. (This is not guaranteed to be stable as changes that are made to the master branch might break few things)

So in your case, when this issue was created, the stable release was 0.18.0. After PR #1460 was merged with the master branch, the tensorflow-io-nightly package consisted of the fix. Thus, I suggested you use the nightly release. However, since we recently released 0.19.0 as the stable version, the fix is now available in the stable release.

Hope this information helps. Closing the issue as of now. Thanks!

@TTian-vivo
Copy link
Author

ok! Thanks! It worked! Close this issue now!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants