diff --git a/pubsub/google/cloud/pubsub_v1/publisher/_batch/base.py b/pubsub/google/cloud/pubsub_v1/publisher/_batch/base.py
index 4dc6ceec6a80..75f430b09421 100644
--- a/pubsub/google/cloud/pubsub_v1/publisher/_batch/base.py
+++ b/pubsub/google/cloud/pubsub_v1/publisher/_batch/base.py
@@ -75,9 +75,12 @@ def messages(self):
     def size(self):
         """Return the total size of all of the messages currently in the batch.
 
+        The size includes any overhead of the actual ``PublishRequest`` that is
+        sent to the backend.
+
         Returns:
             int: The total size of all of the messages currently
-                 in the batch, in bytes.
+                 in the batch (including the request overhead), in bytes.
         """
         raise NotImplementedError
 
diff --git a/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py b/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py
index 726e93166cda..4101bc518b0a 100644
--- a/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py
+++ b/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py
@@ -29,6 +29,7 @@
 
 _LOGGER = logging.getLogger(__name__)
 _CAN_COMMIT = (base.BatchStatus.ACCEPTING_MESSAGES, base.BatchStatus.STARTING)
+_SERVER_PUBLISH_MAX_BYTES = 10 * 1000 * 1000  # max accepted size of PublishRequest
 
 
 class Batch(base.Batch):
@@ -79,13 +80,17 @@ def __init__(self, client, topic, settings, autocommit=True):
         # in order to avoid race conditions
         self._futures = []
         self._messages = []
-        self._size = 0
         self._status = base.BatchStatus.ACCEPTING_MESSAGES
 
+        # The initial size is not zero, we need to account for the size overhead
+        # of the PublishRequest message itself.
+        self._base_request_size = types.PublishRequest(topic=topic).ByteSize()
+        self._size = self._base_request_size
+
         # If max latency is specified, start a thread to monitor the batch and
         # commit when the max latency is reached.
         self._thread = None
-        if autocommit and self._settings.max_latency < float("inf"):
+        if autocommit and self.settings.max_latency < float("inf"):
             self._thread = threading.Thread(
                 name="Thread-MonitorBatchPublisher", target=self.monitor
             )
@@ -124,9 +129,12 @@ def settings(self):
     def size(self):
         """Return the total size of all of the messages currently in the batch.
 
+        The size includes any overhead of the actual ``PublishRequest`` that is
+        sent to the backend.
+
         Returns:
             int: The total size of all of the messages currently
-                 in the batch, in bytes.
+                 in the batch (including the request overhead), in bytes.
         """
         return self._size
 
@@ -251,14 +259,14 @@ def _commit(self):
     def monitor(self):
         """Commit this batch after sufficient time has elapsed.
 
-        This simply sleeps for ``self._settings.max_latency`` seconds,
+        This simply sleeps for ``self.settings.max_latency`` seconds,
         and then calls commit unless the batch has already been committed.
         """
         # NOTE: This blocks; it is up to the calling code to call it
         #       in a separate thread.
 
         # Sleep for however long we should be waiting.
-        time.sleep(self._settings.max_latency)
+        time.sleep(self.settings.max_latency)
 
         _LOGGER.debug("Monitor is waking up")
         return self._commit()
@@ -281,6 +289,10 @@ def publish(self, message):
             the :class:`~concurrent.futures.Future` interface or :data:`None`.
             If :data:`None` is returned, that signals that the batch cannot
             accept a message.
+
+        Raises:
+            pubsub_v1.publisher.exceptions.MessageTooLargeError: If publishing
+                the ``message`` would exceed the max size limit on the backend.
         """
         # Coerce the type, just in case.
         if not isinstance(message, types.PubsubMessage):
@@ -292,12 +304,21 @@ def publish(self, message):
             if not self.will_accept(message):
                 return future
 
-            new_size = self._size + message.ByteSize()
+            size_increase = types.PublishRequest(messages=[message]).ByteSize()
+
+            if (self._base_request_size + size_increase) > _SERVER_PUBLISH_MAX_BYTES:
+                err_msg = (
+                    "The message being published would produce too large a publish "
+                    "request that would exceed the maximum allowed size on the "
+                    "backend ({} bytes).".format(_SERVER_PUBLISH_MAX_BYTES)
+                )
+                raise exceptions.MessageTooLargeError(err_msg)
+
+            new_size = self._size + size_increase
             new_count = len(self._messages) + 1
-            overflow = (
-                new_size > self.settings.max_bytes
-                or new_count >= self._settings.max_messages
-            )
+
+            size_limit = min(self.settings.max_bytes, _SERVER_PUBLISH_MAX_BYTES)
+            overflow = new_size > size_limit or new_count >= self.settings.max_messages
 
             if not self._messages or not overflow:
 
diff --git a/pubsub/google/cloud/pubsub_v1/publisher/exceptions.py b/pubsub/google/cloud/pubsub_v1/publisher/exceptions.py
index adbfaaaa1ee1..be176bac2dba 100644
--- a/pubsub/google/cloud/pubsub_v1/publisher/exceptions.py
+++ b/pubsub/google/cloud/pubsub_v1/publisher/exceptions.py
@@ -22,4 +22,8 @@ class PublishError(GoogleAPICallError):
     pass
 
 
-__all__ = ("PublishError", "TimeoutError")
+class MessageTooLargeError(ValueError):
+    """Attempt to publish a message that would exceed the server max size limit."""
+
+
+__all__ = ("MessageTooLargeError", "PublishError", "TimeoutError")
diff --git a/pubsub/google/cloud/pubsub_v1/types.py b/pubsub/google/cloud/pubsub_v1/types.py
index 7f833660f6e2..2d238b42f797 100644
--- a/pubsub/google/cloud/pubsub_v1/types.py
+++ b/pubsub/google/cloud/pubsub_v1/types.py
@@ -48,7 +48,9 @@
     BatchSettings.__doc__ = "The settings for batch publishing the messages."
     BatchSettings.max_bytes.__doc__ = (
         "The maximum total size of the messages to collect before automatically "
-        "publishing the batch."
+        "publishing the batch, including any byte size overhead of the publish "
+        "request itself. The maximum value is bound by the server-side limit of "
+        "10_000_000 bytes."
     )
     BatchSettings.max_latency.__doc__ = (
         "The maximum number of seconds to wait for additional messages before "
diff --git a/pubsub/tests/system.py b/pubsub/tests/system.py
index 59e5e3fe83a4..65baaf016407 100644
--- a/pubsub/tests/system.py
+++ b/pubsub/tests/system.py
@@ -74,24 +74,52 @@ def cleanup():
 
 
 def test_publish_messages(publisher, topic_path, cleanup):
-    futures = []
     # Make sure the topic gets deleted.
     cleanup.append((publisher.delete_topic, topic_path))
 
     publisher.create_topic(topic_path)
-    for index in six.moves.range(500):
-        futures.append(
-            publisher.publish(
-                topic_path,
-                b"The hail in Wales falls mainly on the snails.",
-                num=str(index),
-            )
+
+    futures = [
+        publisher.publish(
+            topic_path, b"The hail in Wales falls mainly on the snails.", num=str(i)
         )
+        for i in six.moves.range(500)
+    ]
+
     for future in futures:
         result = future.result()
         assert isinstance(result, six.string_types)
 
 
+def test_publish_large_messages(publisher, topic_path, cleanup):
+    # Make sure the topic gets deleted.
+    cleanup.append((publisher.delete_topic, topic_path))
+
+    # Each message should be smaller than 10**7 bytes (the server side limit for
+    # PublishRequest), but all messages combined in a PublishRequest should
+    # slightly exceed that threshold to make sure the publish code handles these
+    # cases well.
+    # Mind that the total PublishRequest size must still be smaller than
+    # 10 * 1024 * 1024 bytes in order to not exceed the max request body size limit.
+    msg_data = b"x" * (2 * 10 ** 6)
+
+    publisher.batch_settings = types.BatchSettings(
+        max_bytes=11 * 1000 * 1000,  # more than the server limit of 10 ** 7
+        max_latency=2.0,  # so that autocommit happens after publishing all messages
+        max_messages=100,
+    )
+    publisher.create_topic(topic_path)
+
+    futures = [publisher.publish(topic_path, msg_data, num=str(i)) for i in range(5)]
+
+    # If the publishing logic correctly split all messages into more than a
+    # single batch despite a high BatchSettings.max_bytes limit, there should
+    # be no "InvalidArgument: request_size is too large" error.
+    for future in futures:
+        result = future.result(timeout=10)
+        assert isinstance(result, six.string_types)  # the message ID
+
+
 def test_subscribe_to_messages(
     publisher, topic_path, subscriber, subscription_path, cleanup
 ):
diff --git a/pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py b/pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py
index 60425e748043..f51b314af6df 100644
--- a/pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py
+++ b/pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py
@@ -34,13 +34,15 @@ def create_client():
     return publisher.Client(credentials=creds)
 
 
-def create_batch(autocommit=False, **batch_settings):
+def create_batch(autocommit=False, topic="topic_name", **batch_settings):
     """Return a batch object suitable for testing.
 
     Args:
         autocommit (bool): Whether the batch should commit after
             ``max_latency`` seconds. By default, this is ``False``
             for unit testing.
+        topic (str): The name of the topic the batch should publish
+            the messages to.
         batch_settings (dict): Arguments passed on to the
             :class:``~.pubsub_v1.types.BatchSettings`` constructor.
 
@@ -49,7 +51,7 @@ def create_batch(autocommit=False, **batch_settings):
     """
     client = create_client()
     settings = types.BatchSettings(**batch_settings)
-    return Batch(client, "topic_name", settings, autocommit=autocommit)
+    return Batch(client, topic, settings, autocommit=autocommit)
 
 
 def test_init():
@@ -299,8 +301,8 @@ def test_monitor_already_committed():
     assert batch._status == status
 
 
-def test_publish():
-    batch = create_batch()
+def test_publish_updating_batch_size():
+    batch = create_batch(topic="topic_foo")
     messages = (
         types.PubsubMessage(data=b"foobarbaz"),
         types.PubsubMessage(data=b"spameggs"),
@@ -314,22 +316,27 @@ def test_publish():
     assert len(batch.messages) == 3
     assert batch._futures == futures
 
-    # The size should have been incremented by the sum of the size of the
-    # messages.
-    expected_size = sum([message_pb.ByteSize() for message_pb in messages])
-    assert batch.size == expected_size
+    # The size should have been incremented by the sum of the size
+    # contributions of each message to the PublishRequest.
+    base_request_size = types.PublishRequest(topic="topic_foo").ByteSize()
+    expected_request_size = base_request_size + sum(
+        types.PublishRequest(messages=[msg]).ByteSize() for msg in messages
+    )
+
+    assert batch.size == expected_request_size
     assert batch.size > 0  # I do not always trust protobuf.
 
 
 def test_publish_not_will_accept():
-    batch = create_batch(max_messages=0)
+    batch = create_batch(topic="topic_foo", max_messages=0)
+    base_request_size = types.PublishRequest(topic="topic_foo").ByteSize()
 
     # Publish the message.
     message = types.PubsubMessage(data=b"foobarbaz")
     future = batch.publish(message)
 
     assert future is None
-    assert batch.size == 0
+    assert batch.size == base_request_size
     assert batch.messages == []
     assert batch._futures == []
 
@@ -361,6 +368,47 @@ def test_publish_exceed_max_messages():
         assert batch._futures == futures
 
 
+@mock.patch.object(thread, "_SERVER_PUBLISH_MAX_BYTES", 1000)
+def test_publish_single_message_size_exceeds_server_size_limit():
+    batch = create_batch(
+        topic="topic_foo",
+        max_messages=1000,
+        max_bytes=1000 * 1000,  # way larger than (mocked) server side limit
+    )
+
+    big_message = types.PubsubMessage(data=b"x" * 984)
+
+    request_size = types.PublishRequest(
+        topic="topic_foo", messages=[big_message]
+    ).ByteSize()
+    assert request_size == 1001  # sanity check, just above the (mocked) server limit
+
+    with pytest.raises(exceptions.MessageTooLargeError):
+        batch.publish(big_message)
+
+
+@mock.patch.object(thread, "_SERVER_PUBLISH_MAX_BYTES", 1000)
+def test_publish_total_messages_size_exceeds_server_size_limit():
+    batch = create_batch(topic="topic_foo", max_messages=10, max_bytes=1500)
+
+    messages = (
+        types.PubsubMessage(data=b"x" * 500),
+        types.PubsubMessage(data=b"x" * 600),
+    )
+
+    # Sanity check - request size is still below BatchSettings.max_bytes,
+    # but it exceeds the server-side size limit.
+    request_size = types.PublishRequest(topic="topic_foo", messages=messages).ByteSize()
+    assert 1000 < request_size < 1500
+
+    with mock.patch.object(batch, "commit") as fake_commit:
+        batch.publish(messages[0])
+        batch.publish(messages[1])
+
+    # The server side limit should kick in and cause a commit.
+    fake_commit.assert_called_once()
+
+
 def test_publish_dict():
     batch = create_batch()
     future = batch.publish({"data": b"foobarbaz", "attributes": {"spam": "eggs"}})