Monitor network traffic with ClickHouse, GoFlow2 and IPinfo

NetFlow is a protocol introduced by Cisco to collect IP traffic statistics. Packets with the same source and destination addresses and ports are grouped into flows. For each flow, the router counts the number of packets and bytes forwarded, and periodically send them to a collector.

In this post we will see how to collect NetFlow data with GoFlow2, ingest it into ClickHouse via Apache Kafka and enrich it with IPinfo’s Free Country + ASN database.

Setup services

First we need to setup the various services. Copy the following Docker Compose file and create the stack with the following command:

docker compose up -d

You should see the following output:

[+] Running 4/4
 βœ” Network ipinfo-netflow-example_default         Created           0.0s
 βœ” Container ipinfo-netflow-example-kafka-1       Started           0.2s
 βœ” Container ipinfo-netflow-example-clickhouse-1  Started           0.3s
 βœ” Container ipinfo-netflow-example-goflow2-1     Started           0.3s
compose.yaml
services:
  clickhouse:
    image: docker.io/clickhouse/clickhouse-server:25.2
    environment:
      # Allow network access for default user
      CLICKHOUSE_SKIP_USER_SETUP: 1
    depends_on:
      - kafka
    ports:
      - 9000:9000
  goflow2:
    image: ghcr.io/netsampler/goflow2:v2.2.2
    command:
      - -transport=kafka 
      - -transport.kafka.brokers=kafka:9092
      - -transport.kafka.topic=netflow
    depends_on:
      - kafka
    ports:
      - 2055:2055/udp
    restart: on-failure
  kafka:
    image: docker.io/apache/kafka:3.9.0
    # See https://docs.docker.com/guides/kafka/
    environment:
      KAFKA_LISTENERS: CONTROLLER://localhost:9091,HOST://0.0.0.0:9092,DOCKER://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: HOST://kafka:9092,DOCKER://kafka:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,DOCKER:PLAINTEXT,HOST:PLAINTEXT
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9091
      KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

Load IPinfo data

To enrich flows with ASN and IP geolocation information we will load IPinfo’s Free Country + ASN database into ClickHouse with mmdb-to-clickhouse. For more information see Lookup IP geolocation and ASN with ClickHouse and IPinfo's free database.

Follow the instructions on GitHub to install mmdb-to-clickhouse and then download the database and insert it:

curl -Lo country_asn.mmdb "https://ipinfo.io/data/free/country_asn.mmdb?token=$YOUR_IPINFO_TOKEN"
./mmdb-to-clickhouse -dsn clickhouse://localhost:9000 -mmdb country_asn.mmdb -name country_asn -test

You should see the following output:

2025/03/09 19:18:15 Net schema: `network` String, `pointer` UInt64, `partition` Date
2025/03/09 19:18:15 Val schema: `pointer` UInt32, `as_domain` String, `as_name` String, `asn` String, `continent` String, `continent_name` String, `country` String, `country_name` String, `partition` Date
2025/03/09 19:18:15 Creating table country_asn_net_history
2025/03/09 19:18:15 Creating table country_asn_val_history
2025/03/09 19:18:15 Creating dictionary country_asn_net
2025/03/09 19:18:15 Creating dictionary country_asn_val
2025/03/09 19:18:15 Dropping partition 2025-03-09
2025/03/09 19:18:15 Dropping partition 2025-03-09
2025/03/09 19:18:15 Inserting data
2025/03/09 19:18:17 Inserted 1000000 networks and 70273 values
2025/03/09 19:18:19 Inserted 2000000 networks and 102950 values
2025/03/09 19:18:21 Inserted 3000000 networks and 109310 values
2025/03/09 19:18:23 Inserted 4000000 networks and 111560 values
2025/03/09 19:18:25 Inserted 5000000 networks and 114142 values
2025/03/09 19:18:25 Inserted 5457187 networks and 118882 values
2025/03/09 19:18:25 Creating function country_asn
2025/03/09 19:18:25 Running test query: SELECT country_asn('1.1.1.1', 'as_domain')
2025/03/09 19:18:25 This may take some time as the dictionnary gets loaded in memory
2025/03/09 19:18:36 Test query result: cloudflare.com

Create the database tables

We then need to create the database tables to store the flows. We need three tables:

  • netflow_queue: read data from Kafka
  • netflow_mv: consume data from netflow_queue and insert it into netflow
  • netflow: store the flows on disk

Let’s open the ClickHouse REPL:

docker compose exec -it clickhouse clickhouse client

And issue the following queries to create the tables:

CREATE TABLE netflow_queue
(
    `time_received_ns` UInt64,
    `time_flow_start_ns` UInt64,
    `time_flow_end_ns` UInt64,
    `bytes` UInt64,
    `packets` UInt64,
    `src_addr` String,
    `dst_addr` String,
    `etype` String,
    `proto` String,
    `src_port` UInt64,
    `dst_port` UInt64,
    -- NOTE: More fields are available, see https://github.com/netsampler/goflow2/blob/main/pb/flow.proto.
)
ENGINE = Kafka('kafka:9092', 'netflow', 'clickhouse', 'JSONEachRow');
CREATE TABLE netflow
(
    `time_received_ns` DateTime64(9),
    `time_flow_start_ns` DateTime64(9),
    `time_flow_end_ns` DateTime64(9),
    `bytes` UInt64,
    `packets` UInt64,
    `src_addr` String,
    `dst_addr` String,
    `etype` String,
    `proto` String,
    `src_port` UInt64,
    `dst_port` UInt64,
    `src_asn` String,
    `src_as_name` String,
    `src_as_domain` String,
    `src_country` String,
    `dst_asn` String,
    `dst_as_name` String,
    `dst_as_domain` String,
    `dst_country` String
)
ENGINE = MergeTree()
ORDER BY time_received_ns;
CREATE MATERIALIZED VIEW netflow_mv TO netflow
AS SELECT *
    EXCEPT (src_info, dst_info)
    REPLACE (
        toDateTime64(time_received_ns / 1e9, 9) AS time_received_ns,
        toDateTime64(time_flow_start_ns / 1e9, 9) AS time_flow_start_ns,
        toDateTime64(time_flow_end_ns / 1e9, 9) AS time_flow_end_ns
    ),
    src_info.1 src_asn,
    src_info.2 src_as_name,
    src_info.3 src_as_domain,
    src_info.4 src_country,
    dst_info.1 dst_asn,
    dst_info.2 dst_as_name,
    dst_info.3 dst_as_domain,
    dst_info.4 dst_country
FROM (
    SELECT *,
        -- Lookup ASN and IP geolocation from the Country + ASN database.
        -- This uses an IP_TRIE internally so this is very fast. 
        country_asn(src_addr, ('asn', 'as_name', 'as_domain', 'country')) src_info,
        country_asn(dst_addr, ('asn', 'as_name', 'as_domain', 'country')) dst_info
    FROM netflow_queue
);

Setup NetFlow

Now we need to actually collect NetFlow data. I use a MikroTik router at home, on which I can setup NetFlow collection like this:

/ip/traffic-flow/set enabled=yes interfaces=all
/ip/traffic-flow/target/add dst-address=X.X.X.X port=2055 version=9

Where X.X.X.X is the IP address of the machine where goflow2 runs.

If you use a different router, refer to its manufacturer instructions for setting-up NetFlow.

Run queries

At this point we should have NetFlow data flowing from the router to GoFlow2, into Kafka, and finally to ClickHouse:

SELECT * FROM netflow LIMIT 1 FORMAT Vertical
time_received_ns:   2025-03-09 18:03:01.841665536
time_flow_start_ns: 2025-03-09 18:02:42.840000000
time_flow_end_ns:   2025-03-09 18:02:44.860000000
bytes:              536
packets:            7
src_addr:           2a01:4f8:2b03:4ce::2
dst_addr:           2a01:cb08:9155:xxxx:xxxx:xxxx:xxxx:xxxx # My NAS IP (anonymized)
etype:              IPv6
proto:              TCP
src_port:           443
dst_port:           35764
# The following columns come from the Country + ASN database:
src_asn:            AS24940
src_as_name:        Hetzner Online GmbH
src_as_domain:      hetzner.de
src_country:        DE
dst_asn:            AS3215
dst_as_name:        Orange S.A. # My ISP
dst_as_domain:      orange.com
dst_country:        FR

Let’s compute some aggregated statistics.

Traffic by IP geolocation country

SELECT
    src_country,
    dst_country,
    formatReadableSize(SUM(bytes)) AS total_bytes
FROM netflow
WHERE src_country != ''
GROUP BY ALL
ORDER BY SUM(bytes) DESC
LIMIT 10

    β”Œβ”€src_country─┬─dst_country─┬─total_bytes─┐
 1. β”‚ FR          β”‚ DE          β”‚ 20.11 GiB   β”‚
 2. β”‚ DE          β”‚ FR          β”‚ 376.46 MiB  β”‚
 3. β”‚ US          β”‚ FR          β”‚ 6.84 MiB    β”‚
 4. β”‚ LV          β”‚ FR          β”‚ 3.18 MiB    β”‚
 5. β”‚ FR          β”‚ CA          β”‚ 2.45 MiB    β”‚
 6. β”‚ FR          β”‚ FR          β”‚ 1.58 MiB    β”‚
 7. β”‚ FR          β”‚ US          β”‚ 1.53 MiB    β”‚
 8. β”‚ FR          β”‚ NL          β”‚ 874.81 KiB  β”‚
 9. β”‚ CA          β”‚ FR          β”‚ 832.17 KiB  β”‚
10. β”‚ NL          β”‚ FR          β”‚ 693.66 KiB  β”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Traffic by ASN

SELECT
    src_as_name,
    dst_as_name,
    formatReadableSize(SUM(bytes)) AS total_bytes
FROM netflow
WHERE src_as_name != ''
GROUP BY ALL
ORDER BY SUM(bytes) DESC
LIMIT 10
    β”Œβ”€src_as_name─────────┬─dst_as_name────────────────────────────────┬─total_bytes─┐
 1. β”‚ Orange S.A.         β”‚ Hetzner Online GmbH                        β”‚ 18.76 GiB   β”‚
 2. β”‚ Hetzner Online GmbH β”‚ Orange S.A.                                β”‚ 347.90 MiB  β”‚
 3. β”‚ Mikrotikls SIA      β”‚ Orange S.A.                                β”‚ 3.18 MiB    β”‚
 4. β”‚ Fastmail Pty Ltd    β”‚ Orange S.A.                                β”‚ 2.00 MiB    β”‚
 5. β”‚ Orange S.A.         β”‚ Civilized Discourse Construction Kit, Inc. β”‚ 1.75 MiB    β”‚
 6. β”‚ Amazon.com, Inc.    β”‚ Orange S.A.                                β”‚ 1.48 MiB    β”‚
 7. β”‚ Fastly, Inc.        β”‚ Orange S.A.                                β”‚ 1.41 MiB    β”‚
 8. β”‚ Google LLC          β”‚ Orange S.A.                                β”‚ 1.37 MiB    β”‚
 9. β”‚ Cloudflare, Inc.    β”‚ Orange S.A.                                β”‚ 1.29 MiB    β”‚
10. β”‚ Orange S.A.         β”‚ Cloudflare, Inc.                           β”‚ 915.95 KiB  β”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Cleanup

Run the following command to stop the services and delete the data:

docker compose down
1 Like

Fantastic tutorial, Max. I really appreciate your post in the community!

1 Like