I am not super familiar with Spark so, if you have any advice for me from the spark community that will be awesome.
ips.csv
IP
8.8.8.8
32.165.16.84
530f:239a:b09d:3e98:56a1:6ef3:9174:acf0
....
MMDB-based enrichment
MMDB based enrichment is going to be always the fastest solution when it comes to IP lookups from an IP database. The MMDB database is a binary database designed for IP addresses.
The code does the following:
- Initializes a Spark session using
create_spark_session. - Loads IP data from a CSV file (ips.csv) into a Spark DataFrame using
load_ips. - Defines a UDF (
enrich_ip_udf) to query the MMDB for metadata associated with each IP address. The metadata includes fields likecountry,continent,asn,as_name, etc. - Applies the enrichment UDF to the IP DataFrame, creating a new column (
enriched) that contains the metadata for each IP. - Extracts individual metadata fields from the
enrichedcolumn and creates a final DataFrame (df_final) with the enriched data. - Saves the final enriched DataFrame (
df_final) to a local Spark SQL table namedlocaldb.ip_enriched. - Queries the enriched table to display a preview of the data, including columns like
IP,country,asn, andas_name.
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, StringType
import maxminddb
def create_spark_session():
return SparkSession.builder \
.appName("IPinfoLiteEnrichment") \
.master("local[*]") \
.getOrCreate()
def load_ips(file_path):
return spark.read.csv(file_path, header=True, inferSchema=True)
def enrich_ip_udf(mmdb_path):
schema = StructType([
StructField("country", StringType(), True),
StructField("country_code", StringType(), True),
StructField("continent", StringType(), True),
StructField("continent_code", StringType(), True),
StructField("asn", StringType(), True),
StructField("as_name", StringType(), True),
StructField("as_domain", StringType(), True),
])
def enrich_ip(ip):
if not hasattr(enrich_ip, "reader"):
enrich_ip.reader = maxminddb.open_database(mmdb_path)
try:
data = enrich_ip.reader.get(ip) or {}
return (
data.get("country", ""),
data.get("country_code", ""),
data.get("continent", ""),
data.get("continent_code", ""),
data.get("asn", ""),
data.get("as_name", ""),
data.get("as_domain", ""),
)
except Exception:
return ("", "", "", "", "", "", "")
return udf(enrich_ip, schema)
def main():
mmdb_path = "ipinfo_lite.mmdb"
input_file = "ips.csv"
global spark
spark = create_spark_session()
df = load_ips(input_file)
enrich_udf = enrich_ip_udf(mmdb_path)
df_enriched = df.withColumn("enriched", enrich_udf(df["IP"]))
df_final = df_enriched.select(
"*",
"enriched.country",
"enriched.country_code",
"enriched.continent",
"enriched.continent_code",
"enriched.asn",
"enriched.as_name",
"enriched.as_domain",
).drop("enriched")
# Optionally write to file or table here
spark.sql("CREATE DATABASE IF NOT EXISTS localdb")
df_final.write.mode("overwrite").saveAsTable("localdb.ip_enriched")
spark.sql("SELECT IP, country, asn, as_name FROM localdb.ip_enriched LIMIT 10").show(truncate=False)
if __name__ == "__main__":
main()
(Click Here) In Spark enrichment via join (Not recommended)
As far as I know, Spark does not have a native IP geolocation lookup mechanism, nor does it support the IP address data type. For that reason, you have to employ an IP address-based join mechanism. However, this is not a super-efficient thing to do, considering you have to identify start_ip and end_ip if you are using a CIDR-based IP database. Snowflake has the same limitation, which is why we export a custom dataset with start_ip, end_ip, and join_key.
For that reason, enriching the platform via IP address joins is going to be inefficient. Additionally, CSV may not be the best solution here; you can use Parquet.
But here is my code, let me know if you can introduce some improvement here:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import LongType
import ipaddress
def create_spark_session():
return SparkSession.builder \
.appName("IPinfoLiteEnrichment") \
.master("local[*]") \
.getOrCreate()
def load_data(ip_file, cidr_file):
df_ips = spark.read.csv(ip_file, header=True, inferSchema=True)
df_ipinfo = spark.read.csv(cidr_file, header=True, inferSchema=True)
return df_ips, df_ipinfo
def ip_to_long(ip):
try:
return int(ipaddress.ip_address(ip))
except Exception:
return None
def cidr_to_range(cidr):
try:
net = ipaddress.ip_network(cidr, strict=False)
return int(net.network_address), int(net.broadcast_address)
except Exception:
return None, None
def main():
ip_file = "ips.csv"
cidr_file = "ipinfo_lite.csv"
global spark
spark = create_spark_session()
df_ips, df_ipinfo = load_data(ip_file, cidr_file)
ip2long_udf = udf(ip_to_long, LongType())
@udf("struct<start:long,end:long>")
def cidr_range_udf(cidr):
start, end = cidr_to_range(cidr)
return {"start": start, "end": end}
df_ipinfo_ranges = df_ipinfo.withColumn("range", cidr_range_udf(col("network"))) \
.withColumn("start", col("range.start")) \
.withColumn("end", col("range.end")) \
.drop("range")
df_ips_int = df_ips.withColumn("ip_int", ip2long_udf(col("IP")))
df_enriched = df_ips_int.join(
df_ipinfo_ranges,
(df_ips_int["ip_int"] >= df_ipinfo_ranges["start"]) &
(df_ips_int["ip_int"] <= df_ipinfo_ranges["end"]),
how="left"
)
# Optionally save to a Spark table or file
spark.sql("CREATE DATABASE IF NOT EXISTS localdb")
df_enriched.write.mode("overwrite").saveAsTable("localdb.ip_enriched")
spark.sql("SELECT IP, country, asn, as_name FROM localdb.ip_enriched LIMIT 10").show(truncate=False)
if __name__ == "__main__":
main()
The MMDB was super fast, as expected, but CSV enrichment was slow. I think Parquet could be slightly better than CSV, but that will improve the ingestion of the initial IP database only.
If you really insist on in-Spark enrichment, we can deliver our Snowflake database as a data download that has the join_key, start_ip, and end_ip converted to int/hex. This will help with join performance. However, writing custom Spark UDFs would require some engineering effort on your end.
Whatever data platform you choose, MMDB’s performance is unbeatable. So, focus on enrichment via the MMDB database and then look at data platform/solution ingestion solutions.