diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/message.py b/pubsub/google/cloud/pubsub_v1/subscriber/message.py index d24161e853f4..091826007ae3 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/message.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/message.py @@ -14,10 +14,12 @@ from __future__ import absolute_import +import datetime import json import math import time +from google.api_core import datetime_helpers from google.cloud.pubsub_v1.subscriber._protocol import requests @@ -151,7 +153,11 @@ def publish_time(self): Returns: datetime: The date and time that the message was published. """ - return self._message.publish_time + timestamp = self._message.publish_time + delta = datetime.timedelta( + seconds=timestamp.seconds, + microseconds=timestamp.nanos // 1000) + return datetime_helpers._UTC_EPOCH + delta @property def size(self): diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_message.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_message.py index 431d39bb6afc..4089d4d5109d 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_message.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_message.py @@ -12,25 +12,40 @@ # See the License for the specific language governing permissions and # limitations under the License. +import datetime import time import mock +import pytz from six.moves import queue +from google.protobuf import timestamp_pb2 +from google.api_core import datetime_helpers from google.cloud.pubsub_v1 import types from google.cloud.pubsub_v1.subscriber import message from google.cloud.pubsub_v1.subscriber._protocol import requests +RECEIVED = datetime.datetime(2012, 4, 21, 15, 0, tzinfo=pytz.utc) +RECEIVED_SECONDS = datetime_helpers.to_milliseconds(RECEIVED) // 1000 +PUBLISHED_MICROS = 123456 +PUBLISHED = RECEIVED + datetime.timedelta( + days=1, microseconds=PUBLISHED_MICROS) +PUBLISHED_SECONDS = datetime_helpers.to_milliseconds(PUBLISHED) // 1000 + + def create_message(data, ack_id='ACKID', **attrs): with mock.patch.object(message.Message, 'lease') as lease: with mock.patch.object(time, 'time') as time_: - time_.return_value = 1335020400 + time_.return_value = RECEIVED_SECONDS msg = message.Message(types.PubsubMessage( attributes=attrs, data=data, message_id='message_id', - publish_time=types.Timestamp(seconds=1335020400 - 86400), + publish_time=timestamp_pb2.Timestamp( + seconds=PUBLISHED_SECONDS, + nanos=PUBLISHED_MICROS * 1000, + ), ), ack_id, queue.Queue()) lease.assert_called_once_with() return msg @@ -48,7 +63,7 @@ def test_data(): def test_publish_time(): msg = create_message(b'foo') - assert msg.publish_time == types.Timestamp(seconds=1335020400 - 86400) + assert msg.publish_time == PUBLISHED def check_call_types(mock, *args, **kwargs): @@ -80,7 +95,7 @@ def test_ack(): msg.ack() put.assert_called_once_with(requests.AckRequest( ack_id='bogus_ack_id', - byte_size=25, + byte_size=30, time_to_ack=mock.ANY, )) check_call_types(put, requests.AckRequest) @@ -92,7 +107,7 @@ def test_drop(): msg.drop() put.assert_called_once_with(requests.DropRequest( ack_id='bogus_ack_id', - byte_size=25, + byte_size=30, )) check_call_types(put, requests.DropRequest) @@ -103,7 +118,7 @@ def test_lease(): msg.lease() put.assert_called_once_with(requests.LeaseRequest( ack_id='bogus_ack_id', - byte_size=25, + byte_size=30, )) check_call_types(put, requests.LeaseRequest) @@ -125,7 +140,7 @@ def test_nack(): msg.nack() put.assert_called_once_with(requests.NackRequest( ack_id='bogus_ack_id', - byte_size=25, + byte_size=30, )) check_call_types(put, requests.NackRequest)