Using IPinfo's data in Spark (Using Pyspark)

I am not super familiar with Spark so, if you have any advice for me from the spark community that will be awesome.

ips.csv

IP
8.8.8.8
32.165.16.84
530f:239a:b09d:3e98:56a1:6ef3:9174:acf0
....

MMDB-based enrichment

MMDB based enrichment is going to be always the fastest solution when it comes to IP lookups from an IP database. The MMDB database is a binary database designed for IP addresses.

The code does the following:

  1. Initializes a Spark session using create_spark_session.
  2. Loads IP data from a CSV file (ips.csv) into a Spark DataFrame using load_ips.
  3. Defines a UDF (enrich_ip_udf) to query the MMDB for metadata associated with each IP address. The metadata includes fields like country, continent, asn, as_name, etc.
  4. Applies the enrichment UDF to the IP DataFrame, creating a new column (enriched) that contains the metadata for each IP.
  5. Extracts individual metadata fields from the enriched column and creates a final DataFrame (df_final) with the enriched data.
  6. Saves the final enriched DataFrame (df_final) to a local Spark SQL table named localdb.ip_enriched.
  7. Queries the enriched table to display a preview of the data, including columns like IP, country, asn, and as_name.
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, StringType
import maxminddb

def create_spark_session():
    return SparkSession.builder \
        .appName("IPinfoLiteEnrichment") \
        .master("local[*]") \
        .getOrCreate()

def load_ips(file_path):
    return spark.read.csv(file_path, header=True, inferSchema=True)

def enrich_ip_udf(mmdb_path):
    schema = StructType([
        StructField("country", StringType(), True),
        StructField("country_code", StringType(), True),
        StructField("continent", StringType(), True),
        StructField("continent_code", StringType(), True),
        StructField("asn", StringType(), True),
        StructField("as_name", StringType(), True),
        StructField("as_domain", StringType(), True),
    ])

    def enrich_ip(ip):
        if not hasattr(enrich_ip, "reader"):
            enrich_ip.reader = maxminddb.open_database(mmdb_path)
        try:
            data = enrich_ip.reader.get(ip) or {}
            return (
                data.get("country", ""),
                data.get("country_code", ""),
                data.get("continent", ""),
                data.get("continent_code", ""),
                data.get("asn", ""),
                data.get("as_name", ""),
                data.get("as_domain", ""),
            )
        except Exception:
            return ("", "", "", "", "", "", "")

    return udf(enrich_ip, schema)

def main():
    mmdb_path = "ipinfo_lite.mmdb"
    input_file = "ips.csv"

    global spark
    spark = create_spark_session()
    df = load_ips(input_file)

    enrich_udf = enrich_ip_udf(mmdb_path)
    df_enriched = df.withColumn("enriched", enrich_udf(df["IP"]))

    df_final = df_enriched.select(
        "*",
        "enriched.country",
        "enriched.country_code",
        "enriched.continent",
        "enriched.continent_code",
        "enriched.asn",
        "enriched.as_name",
        "enriched.as_domain",
    ).drop("enriched")

    # Optionally write to file or table here
    spark.sql("CREATE DATABASE IF NOT EXISTS localdb")
    df_final.write.mode("overwrite").saveAsTable("localdb.ip_enriched")

    spark.sql("SELECT IP, country, asn, as_name FROM localdb.ip_enriched LIMIT 10").show(truncate=False)

if __name__ == "__main__":
    main()
(Click Here) In Spark enrichment via join (Not recommended)

As far as I know, Spark does not have a native IP geolocation lookup mechanism, nor does it support the IP address data type. For that reason, you have to employ an IP address-based join mechanism. However, this is not a super-efficient thing to do, considering you have to identify start_ip and end_ip if you are using a CIDR-based IP database. Snowflake has the same limitation, which is why we export a custom dataset with start_ip, end_ip, and join_key.

For that reason, enriching the platform via IP address joins is going to be inefficient. Additionally, CSV may not be the best solution here; you can use Parquet.

But here is my code, let me know if you can introduce some improvement here:

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import LongType
import ipaddress

def create_spark_session():
    return SparkSession.builder \
        .appName("IPinfoLiteEnrichment") \
        .master("local[*]") \
        .getOrCreate()

def load_data(ip_file, cidr_file):
    df_ips = spark.read.csv(ip_file, header=True, inferSchema=True)
    df_ipinfo = spark.read.csv(cidr_file, header=True, inferSchema=True)
    return df_ips, df_ipinfo

def ip_to_long(ip):
    try:
        return int(ipaddress.ip_address(ip))
    except Exception:
        return None

def cidr_to_range(cidr):
    try:
        net = ipaddress.ip_network(cidr, strict=False)
        return int(net.network_address), int(net.broadcast_address)
    except Exception:
        return None, None

def main():
    ip_file = "ips.csv"
    cidr_file = "ipinfo_lite.csv"

    global spark
    spark = create_spark_session()

    df_ips, df_ipinfo = load_data(ip_file, cidr_file)

    ip2long_udf = udf(ip_to_long, LongType())

    @udf("struct<start:long,end:long>")
    def cidr_range_udf(cidr):
        start, end = cidr_to_range(cidr)
        return {"start": start, "end": end}

    df_ipinfo_ranges = df_ipinfo.withColumn("range", cidr_range_udf(col("network"))) \
        .withColumn("start", col("range.start")) \
        .withColumn("end", col("range.end")) \
        .drop("range")

    df_ips_int = df_ips.withColumn("ip_int", ip2long_udf(col("IP")))

    df_enriched = df_ips_int.join(
        df_ipinfo_ranges,
        (df_ips_int["ip_int"] >= df_ipinfo_ranges["start"]) &
        (df_ips_int["ip_int"] <= df_ipinfo_ranges["end"]),
        how="left"
    )

    # Optionally save to a Spark table or file
    spark.sql("CREATE DATABASE IF NOT EXISTS localdb")
    df_enriched.write.mode("overwrite").saveAsTable("localdb.ip_enriched")

    spark.sql("SELECT IP, country, asn, as_name FROM localdb.ip_enriched LIMIT 10").show(truncate=False)

if __name__ == "__main__":
    main()

The MMDB was super fast, as expected, but CSV enrichment was slow. I think Parquet could be slightly better than CSV, but that will improve the ingestion of the initial IP database only.

If you really insist on in-Spark enrichment, we can deliver our Snowflake database as a data download that has the join_key, start_ip, and end_ip converted to int/hex. This will help with join performance. However, writing custom Spark UDFs would require some engineering effort on your end.

Whatever data platform you choose, MMDB’s performance is unbeatable. So, focus on enrichment via the MMDB database and then look at data platform/solution ingestion solutions.