Extracting IP Addresses and HTTP Codes in PySpark
In this tutorial, we'll learn how to convert an RDD from a text file into a DataFrame. Using log data, we'll extract IP addresses and HTTP status codes with PySpark, and then create a DataFrame to store this information for further analysis.
Let's consider the following sample log data: click here to download
83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-search.png HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36" 83.149.9.216 - - [17/May/2015:10:05:43 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-dashboard3.png HTTP/1.1" 200 171717 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"
First, we need to initialize a Spark session in PySpark:
from pyspark.sql import SparkSession
spark_session=SparkSession.builder.master("local").appName("read log file using pyspark").getOrCreate();
Next, we'll create an RDD (Resilient Distributed Dataset) from the log data:
raw_rdd=spark_session.sparkContext.textFile("/Users/apple/PycharmProjects/pyspark/data/text/log.txt")
rdd=raw_rdd.map( lambda line: line.split('-') )
rdd.foreach(lambda data:print(data)) #for local
def process(list):
output_list=[]
output_list.append(list[0])
if "HTTP/1.1\" 200" in list[2]:
output_list.append("200")
elif "HTTP/1.1\" 404" in list[2]:
output_list.append("404")
else:
output_list.append("unknown")
return output_list
transformed_rdd=rdd.map(process)
final_rdd=transformed_rdd.map(lambda line: ( line[0],line[1] ) ) df=spark_session.createDataFrame(final_rdd,schema) df.show()
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
def process(list):
output_list=[]
output_list.append(list[0])
if "HTTP/1.1\" 200" in list[2]:
output_list.append("200")
elif "HTTP/1.1\" 404" in list[2]:
output_list.append("404")
else:
output_list.append("unknown")
return output_list
spark_session=SparkSession.builder.master("local").appName("test").getOrCreate();
raw_rdd=spark_session.sparkContext.textFile("/Users/apple/PycharmProjects/pyspark/data/text/log.txt")
rdd=raw_rdd.map( lambda line: line.split('-') )
rdd.foreach(lambda data:print(data))
transformed_rdd=rdd.map(process)
schema=StructType( [ StructField("ip",StringType(),True),StructField("http_code",StringType(),True)])
final_rdd=transformed_rdd.map(lambda line: ( line[0],line[1] ) )
df=spark_session.createDataFrame(final_rdd,schema)
df.show()