Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-2855] nexmark python suite implement queries 0, 1, 2 and 9 #12427

Merged
merged 18 commits into from
Aug 13, 2020
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Result of WinningBid transform."""
from __future__ import absolute_import

from apache_beam.coders import coder_impl
from apache_beam.coders.coders import FastCoder
from apache_beam.testing.benchmarks.nexmark import nexmark_util
from apache_beam.testing.benchmarks.nexmark.models import nexmark_model


class AuctionBidCoder(FastCoder):
def _create_impl(self):
return AuctionBidCoderImpl()

def is_deterministic(self):
return True


class AuctionBid(object):
CODER = AuctionBidCoder()

def __init__(self, auction, bid):
self.auction = auction
self.bid = bid

def __repr__(self):
return nexmark_util.model_to_json(self)


class AuctionBidCoderImpl(coder_impl.StreamCoderImpl):
_auction_coder_impl = nexmark_model.AuctionCoderImpl()
_bid_coder_Impl = nexmark_model.BidCoderImpl()

def encode_to_stream(self, value, stream, nested):
self._auction_coder_impl.encode_to_stream(value.auction, stream, True)
self._bid_coder_Impl.encode_to_stream(value.bid, stream, True)

def decode_from_stream(self, stream, nested):
auction = self._auction_coder_impl.decode_from_stream(stream, True)
bid = self._bid_coder_Impl.decode_from_stream(stream, True)
return AuctionBid(auction, bid)
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Result of Query2."""
from __future__ import absolute_import

from apache_beam.coders import coder_impl
from apache_beam.coders.coders import FastCoder
from apache_beam.testing.benchmarks.nexmark import nexmark_util


class AuctionPriceCoder(FastCoder):
def _create_impl(self):
return AuctionPriceCoderImpl()

def is_deterministic(self):
return True


class AuctionPrice(object):
CODER = AuctionPriceCoder()

def __init__(self, auction, price):
self.auction = auction
self.price = price

def __repr__(self):
return nexmark_util.model_to_json(self)


class AuctionPriceCoderImpl(coder_impl.StreamCoderImpl):
_int_coder_impl = coder_impl.VarIntCoderImpl()

def encode_to_stream(self, value, stream, nested):
self._int_coder_impl.encode_to_stream(value.auction, stream, True)
self._int_coder_impl.encode_to_stream(value.price, stream, True)

def decode_from_stream(self, stream, nested):

auction = self._int_coder_impl.decode_from_stream(stream, True)
price = self._int_coder_impl.decode_from_stream(stream, True)
return AuctionPrice(auction, price)
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

""" Field names for de-serializing json representation of Models
"""


class FieldNames:
ID = 'id'
NAME = 'name'
EMAIL_ADDRESS = 'emailAddress'
CREDIT_CARD = 'creditCard'
CITY = "city"
STATE = "state"
DATE_TIME = "dateTime"
EXTRA = "extra"
ITEM_NAME = "itemName"
DESCRIPTION = "description"
INITIAL_BID = "initialBid"
RESERVE = "reserve"
EXPIRES = "expires"
SELLER = "seller"
CATEGORY = "category"
AUCTION = "auction"
BIDDER = "bidder"
PRICE = "price"
Original file line number Diff line number Diff line change
Expand Up @@ -26,31 +26,52 @@
- The bid on an item for auction (Bid).

"""
from __future__ import absolute_import

from apache_beam.coders import coder_impl
from apache_beam.coders.coders import FastCoder
from apache_beam.coders.coders import StrUtf8Coder
from apache_beam.testing.benchmarks.nexmark import nexmark_util


class PersonCoder(FastCoder):
def _create_impl(self):
return PersonCoderImpl()

def is_deterministic(self):
return True


class Person(object):
"Author of an auction or a bid."
CODER = PersonCoder()

def __init__(
self, id, name, email, credit_card, city, state, timestamp, extra=None):
self, id, name, email, credit_card, city, state, date_time, extra=None):
self.id = id
self.name = name
self.email = email # key
self.email_address = email # key
self.credit_card = credit_card
self.city = city
self.state = state
self.timestamp = timestamp
self.date_time = date_time
self.extra = extra

def __repr__(self):
return 'Person({id}, {email})'.format(
**{
'id': self.id, 'email': self.email
})
return nexmark_util.model_to_json(self)


class AuctionCoder(FastCoder):
def _create_impl(self):
return AuctionCoderImpl()

def is_deterministic(self):
return True


class Auction(object):
"Item for auction."
CODER = AuctionCoder()

def __init__(
self,
Expand All @@ -59,7 +80,7 @@ def __init__(
description,
initial_bid,
reserve_price,
timestamp,
date_time,
expires,
seller,
category,
Expand All @@ -68,32 +89,124 @@ def __init__(
self.item_name = item_name # key
self.description = description
self.initial_bid = initial_bid
self.reserve_price = reserve_price
self.timestamp = timestamp
self.reserve = reserve_price
self.date_time = date_time
self.expires = expires
self.seller = seller
self.category = category
self.extra = extra

def __repr__(self):
return 'Auction({id}, {item_name})'.format(
**{
'id': self.id, 'item_name': self.item_name
})
return nexmark_util.model_to_json(self)


class BidCoder(FastCoder):
def _create_impl(self):
return BidCoderImpl()

def is_deterministic(self):
return True


class Bid(object):
"A bid for an item for auction."
CODER = BidCoder()

def __init__(self, auction, bidder, price, timestamp, extra=None):
def __init__(self, auction, bidder, price, date_time, extra=None):
self.auction = auction # key
self.bidder = bidder
self.price = price
self.timestamp = timestamp
self.date_time = date_time
self.extra = extra

def __repr__(self):
return 'Bid({auction}, {bidder}, {price})'.format(
**{
'auction': self.auction, 'bidder': self.bidder, 'price': self.price
})
return nexmark_util.model_to_json(self)


class AuctionCoderImpl(coder_impl.StreamCoderImpl):
_int_coder_impl = coder_impl.VarIntCoderImpl()
_str_coder_impl = StrUtf8Coder().get_impl()
_time_coder_impl = coder_impl.TimestampCoderImpl()

def encode_to_stream(self, value, stream, nested):
self._int_coder_impl.encode_to_stream(value.id, stream, True)
self._str_coder_impl.encode_to_stream(value.item_name, stream, True)
self._str_coder_impl.encode_to_stream(value.description, stream, True)
self._int_coder_impl.encode_to_stream(value.initial_bid, stream, True)
self._int_coder_impl.encode_to_stream(value.reserve, stream, True)
self._time_coder_impl.encode_to_stream(value.date_time, stream, True)
self._time_coder_impl.encode_to_stream(value.expires, stream, True)
self._int_coder_impl.encode_to_stream(value.seller, stream, True)
self._int_coder_impl.encode_to_stream(value.category, stream, True)
self._str_coder_impl.encode_to_stream(value.extra, stream, True)

def decode_from_stream(self, stream, nested):
id = self._int_coder_impl.decode_from_stream(stream, True)
item_name = self._str_coder_impl.decode_from_stream(stream, True)
description = self._str_coder_impl.decode_from_stream(stream, True)
initial_bid = self._int_coder_impl.decode_from_stream(stream, True)
reserve = self._int_coder_impl.decode_from_stream(stream, True)
date_time = self._time_coder_impl.decode_from_stream(stream, True)
expires = self._time_coder_impl.decode_from_stream(stream, True)
seller = self._int_coder_impl.decode_from_stream(stream, True)
category = self._int_coder_impl.decode_from_stream(stream, True)
extra = self._str_coder_impl.decode_from_stream(stream, True)
return Auction(
id,
item_name,
description,
initial_bid,
reserve,
date_time,
expires,
seller,
category,
extra)


class BidCoderImpl(coder_impl.StreamCoderImpl):
_int_coder_impl = coder_impl.VarIntCoderImpl()
_str_coder_impl = StrUtf8Coder().get_impl()
_time_coder_impl = coder_impl.TimestampCoderImpl()

def encode_to_stream(self, value, stream, nested):
self._int_coder_impl.encode_to_stream(value.auction, stream, True)
self._int_coder_impl.encode_to_stream(value.bidder, stream, True)
self._int_coder_impl.encode_to_stream(value.price, stream, True)
self._time_coder_impl.encode_to_stream(value.date_time, stream, True)
self._str_coder_impl.encode_to_stream(value.extra, stream, True)

def decode_from_stream(self, stream, nested):
auction = self._int_coder_impl.decode_from_stream(stream, True)
bidder = self._int_coder_impl.decode_from_stream(stream, True)
price = self._int_coder_impl.decode_from_stream(stream, True)
date_time = self._time_coder_impl.decode_from_stream(stream, True)
extra = self._str_coder_impl.decode_from_stream(stream, True)
return Bid(auction, bidder, price, date_time, extra)


class PersonCoderImpl(coder_impl.StreamCoderImpl):
_int_coder_impl = coder_impl.VarIntCoderImpl()
_str_coder_impl = StrUtf8Coder().get_impl()
_time_coder_impl = coder_impl.TimestampCoderImpl()

def encode_to_stream(self, value, stream, nested):
self._int_coder_impl.encode_to_stream(value.id, stream, True)
self._str_coder_impl.encode_to_stream(value.name, stream, True)
self._str_coder_impl.encode_to_stream(value.email_address, stream, True)
self._str_coder_impl.encode_to_stream(value.credit_card, stream, True)
self._str_coder_impl.encode_to_stream(value.city, stream, True)
self._str_coder_impl.encode_to_stream(value.state, stream, True)
self._time_coder_impl.encode_to_stream(value.date_time, stream, True)
self._str_coder_impl.encode_to_stream(value.extra, stream, True)

def decode_from_stream(self, stream, nested):
id = self._int_coder_impl.decode_from_stream(stream, True)
name = self._str_coder_impl.decode_from_stream(stream, True)
email = self._str_coder_impl.decode_from_stream(stream, True)
credit_card = self._str_coder_impl.decode_from_stream(stream, True)
city = self._str_coder_impl.decode_from_stream(stream, True)
state = self._str_coder_impl.decode_from_stream(stream, True)
date_time = self._time_coder_impl.decode_from_stream(stream, True)
extra = self._str_coder_impl.decode_from_stream(stream, True)
return Person(id, name, email, credit_card, city, state, date_time, extra)
Loading