Building a data pipeline using BeautifulSoup, Apache Kafka, Apache Spark Streaming and MySQL

Photo by Sigmund on Unsplash

Building a data pipeline using BeautifulSoup, Apache Kafka, Apache Spark Streaming and MySQL

Here is a little fun side project to help you get your feet wet with Apache Kafka and Apache Streaming and at the same time give you a little overview of how different tools can work together to provide real world value. We are going to build a simple web scrapper to get products data from amazon.com and push them into Kafka. After that a Spark job is going to read it from Kafka, do some simple processing and then save the result in a MySQL database.

Before we start it is important to set up the scope of this article. This article will not show how to:

  • Download and install Apache Kafka, Apache Spark or MySQL
  • Download and install pyspark, kafka-python or beautifulsoup

All the code used can be found in this repo. And with that said here we go.

The Scrapper

If you haven’t guess it by now, we are going to use Python for this one. One of the most popular libraries in python used for parsing html is BeautifulSoup.

Imagine for now that we have downloaded the web page and saved it in a variable -we will get to the downloading part later-. we pass the page to the BeautifulSoup function

soup = BeautifulSoup(htmlFile, 'lxml')

I have ran across some problems with the html.parser -it didn’t catch some elements-, that’s is why I used lxml. It is worth mentioning that lxml is also faster that html.parser and other parsers.

If you haven’t inspected the html code of an amazon.com page before, let me tell you that you are in for a treat. divs inside divs, lists inside lists that you would easily feel overwhelmed -at least I did-. After searching through lines of code and with a little help from Google chrome’s inspect element tool. I realized that the title of the product is always placed in a span with the id ‘productTitle’. so if we do this then we will get the title

soup.find("span", attrs={"id":'productTitle'}).string.strip()

similarly, if we searched for div with id ‘productDescription’, we will find a p element inside it with the product description

soup.find('div',attrs={'id':'productDescription'}).find('p').string.strip()

So far so good

For the product rating, we have to get a bit clever. The main rating is an icon of stars, we want to get the alternative text which is the rating in text. But the text is something like ‘4.2 out of 5 stars’. A simple split based on the white spaces will solve that problem for us.

soup.find(attrs={'id':'averageCustomerReviews'}).find("span", attrs={'class':'a-icon-alt'}).string.strip().split()[0]

what about the product categories ?. well there is no single element in the page that list the categories. But up high at the top there are some links used for navigating

image.png Amazon saves the categories of the product in some strange places

This can work for us. I have chose to get all the categories and put all of them in a single string separate by commas. Using chrome’s inspect element tool have lead me to know exactly which element to look for. For better maintainability, I decided to make it a function

def GetCategories(div):
categories = ','.join([ e.string.strip() for e in div.find_all(['li','a'],attrs={'class':'a-link-normal'})])
return categories

with the call of the function being

GetCategories(soup.find(attrs={'id':'wayfinding-breadcrumbs_feature_div'}))

Lastly, the product prices. This was a bit tricky as there was two ways Amazon displayed prices. First there was the simple and easy way of displaying it directly on the same page

image.png The price is right under the title. And they are nice enough to display the Fees next to it

And then there is the hard way. you see, for some products there are multiple offers and prices.

image.png Multiple prices are handled differently

These prices are not stored on the same page. They are stored in another page which is loaded when the above link is clicked

image.png Multiple prices to choose from

So we need to write a function to look for the price right under the title, if it isn’t there then follow the link and get the prices from there. And since we already downloaded an extra page I figured I should retrieve all the prices and join them in one string separated by a delimiter.

Note that it is possible that you find both ways in some products. In this case I choose to take the easy way and extract the one on the same page.

I also felt that a good try/except is needed for this function specially.

def GetPrices(soup):
try:
if soup.find('div',attrs={'id':re.compile('olp-upd-new')}) == None:
if soup.find('div',attrs={'id':'price'}) == None:
exit
return '#'.join([soup.find('div',attrs={'id':'price'}).find('span',attrs={'id':'priceblock_ourprice'}).string])
link = soup.find('div',attrs={'id':re.compile('olp-upd-new')}).find('a',attrs={'class':'a-link-normal'})['href']
webPage = DownloadPage('https://www.amazon.com'+link)
miniSoup = BeautifulSoup(webPage,'lxml').body
productPrices = [e.find('span',attrs={'class':'olpOfferPrice'}).string.strip() for e in miniSoup.find_all('div',attrs={'class':'olpOffer'})]
return '#'.join(productPrices)
except Exception as e:
print(e)

And how do we exactly download a webpage ?. It is not that hard. We just use the requests package with some header and voila

def DownloadPage(link):
header = ({'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.121 Safari/537.36',\
'Accept-Language':'en-US, en;q=0.5'})
webPage = requests.get(link,headers=header)
return webPage.content

Now our little scrapper is almost finished. Just missing extracting the URLs of the similar products and adding them to a link so that it can go forever. well actually not forever. You see Amazon like any company out there wants to protect its websites from cyber attacks. So after around 50-ish request amazon.com will deny every request you sent saying something like ‘please use our API instead of crawling our website’. So what would we do ?. Nothing. Just lie down and accept your fate. After all this is just a demo.

Where were we ?. ah yes extracting the URLs of the similar products

def GetProducts(soup):
links = []
for div in soup.find_all('div',attrs={'id':'similarities_feature_div'}):
if div.find('ol',class_='a-carousel') != None:
for e in div.find('ol',class_='a-carousel').find_all('a',attrs={'class':'a-link-normal'}):
if len(e['class']) == 1:
links.append(e['href'])
return links

The Kafka Producer

Writing a Kafka producer is really simple all you have to do is just write two three lines

producer = KafkaProducer()
producer.send('amazon', pickle.dumps(product))

The first line is the producer deceleration. Usually you pass the list of bootstrap servers as an argument but if you left it empty it will default to localhost:9092 which is exactly what I have.

The second line is the send method which you pass the name of the topic and the array of bytes you want to send. You can pass a key also but I don’t need to.

The topic is ‘amazon’. And for the array of bytes there are plenty of options for us to send all the data we extracted. What I chose is to put everything in a dictionary and serialize it using pickle.

Just a quick note, the send method in the producer is asynchronous. That is good for performance but make sure you flush before closing the program.

The Spark Job

For the streaming part, we will use the new structured streaming API. This has the benefit of using the same functions and methods as the structured batch processing API as you will see.

First lets define the spark session

spark = SparkSession.builder.master('local[2]').appName('StreamingDemo').getOrCreate()

Then we read from our Kafka source

df = spark.readStream.format('kafka')\
.option('kafka.bootstrap.servers','localhost:9092')\
.option('subscribe','amazon')\
.load()

The dataframe read will contain columns like value, key, timestamp …etc. What we care about is the value column. However, there is a problem. The value column carries the serialized data received from the Kafka server. We know that we can use pickle’s load method to deserialize it, but how can we use it on a dataframe ?. The answer is to use it inside a user-defined function.

deser = udf(lambda x: pickle.loads(x),MapType(StringType(),StringType()))

Great. So we add a column to our dataframe with the deserialized value

deserlizedDF = df.withColumn('map',deser(df['value']))

Then we use that dataframe to extract the columns from the deserialized dictionary.

parsedDF = deserlizedDF.withColumn('title',element_at('map','productTitle'))\
.withColumn('Categories',element_at('map','productCategories'))\
.withColumn('Rating',element_at('map','productRating'))\
.withColumn('Description',element_at('map','productDescription'))\
.withColumn('Prices',element_at('map','productPrices'))\

Assume that we want to extract the minimum and maximum prices which a product have. First we would split the prices string, then have to cast all the elements into float so that we can compare between them. Numerical comparison is the correct way of comparison as string comparison will return that 8.99 is larger that 20.99.

.withColumn('Max_Price',array_max(split(element_at('map','productPrices'),r'#*\$').cast(ArrayType(FloatType()))))

The regular expression used to split may look weird because it is used to both split the string and remove the $ before every price. And that is a good reminder to check the data at the source before sending them to Kafka to save some useless transformations afterwards.

Lastly, we select only the data we care about from the dataframe

projectedDF = parsedDF.select('title','Categories','Rating','Prices','Min_Price','Max_Price')

What is left now is saving the result into the MySQL database. Spark structured streaming support many targets or sinks as they call it directly. But databases are not one of them -I know, shocking !!!-. It still enables you to write the result of your stream to a database but not directly.

Let me explain

Additionally to the many sinks Spark provide directly, it provides two methods to use for other sinks. foreach and foreachBatch. foreach deals with each row appended to the stream, while foreachBatch deals with each batch. We pass a function to both methods to handle the data we receive. In our case using foreach feels like a bit of an overkill, so I chose foreachBatch

result = projectedDF.writeStream.foreachBatch(foreach_jdbc_writer).start()

The function passed to the foreachBatch will receive a dataframe repesenting the batch and a unique id for the batch. It is inside this function that we use the dataframe object to write the data into the MySQL database

def foreach_jdbc_writer(df,epoch_id):
df.write.\
jdbc(url="jdbc:mysql://localhost/world",table="amazon_products",mode='append',properties={"driver":"com.mysql.cj.jdbc.Driver","user":"ahmed"})

Finally we added the one line you should never forget in a Spark streaming job

result.awaitTermination()

Also don’t forget adding

--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 --jars mysql-connector-java-8.0.20.jar

to your Spark Application call.

And That’s it. Fire your Spark job and you scrapper and enjoy a cup of coffee while your scrapper indulge on an adventure in the land of amazon.com .But make it instant coffee as it will be a short adventure.

See you Space Cowboy