EDITING BOARD
RO
EN
×
▼ BROWSE ISSUES ▼
Issue 62

OpenStreetMap in the age of Spark

Adrian Bona
adrian.bona@telenav.com



PROGRAMMING

OpenStreetMap is an open/collaborative map considered the Wikipedia of maps. There are tons of information available online and even books about it... Anyway, it is not the history of OSM that we will discuss here. We will rather take a look at the impressive dataset behind and how such a big map can be analysed with modern technologies as Apache Spark.

Short anatomy of the OpenStreetMap

One of the reasons OSM got where it is now, is that it's quite a simple data model. There are only three types of entities:

Each entity can be tagged (for example, a way tagged with the highway=residential marks that the way is a road and the road type is residential).

In addition, meta information about who and when each entity was added is kept.

How big is OpenStreetMap actually?

The data is available at http://planet.openstreetmap.org and it comes in two formats:

The OSMstats project highlights how the map evolves on a daily basis.

Is OpenStreetMap Big Data ready?

Getting started with OpenStreetMap at large scale (the entire planet) can be painful. A few years ago, we were a bit intrigued to see people waiting hours or even days to get a piece of OSM imported in PostgreSQL on huge machines. But we said OK ... this is not Big Data.

Meanwhile, we started to work on various geo-spatial analyses involving technologies from a Big Data stack, where OSM was used and we were again intrigued, as the regular way to handle the OSM data was to run osmosis over the huge PBF planet file and to dump some CSV files for various scenarios. Even if this works, it's sub-optimal, and so we wrote an OSM converter to a big data friendly columnar format called Parquet.

The converter is available at github.com/adrianulbona/osm-parquetizer.

Hopefully, this will make the valuable work of so many OSM contributors easily available for the Big Data world.

How fast is the conversion?

It is less than a minute for romania-latest.osm.pbf and \~3 hours (on a decent laptop with SSD) for the planet-latest.osm.pbf.

Where is Spark in this story?

The converter mentioned above takes one file, and not only does it convert the data, but it also splits it in three files, one for each OSM entity type - each file basically represents a collection of structured data (a table). The schemas of the tables are the following:

node
 |-- id: long
 |-- version: integer
 |-- timestamp: long
 |-- changeset: long
 |-- uid: integer
 |-- user_sid: string
 |-- tags: array
 |    |-- element: struct
 |    |    |-- key: string
 |    |    |-- value: string
 |-- latitude: double
 |-- longitude: double

way
 |-- id: long
 |-- version: integer
 |-- timestamp: long
 |-- changeset: long
 |-- uid: integer
 |-- user_sid: string
 |-- tags: array
 |    |-- element: struct
 |    |    |-- key: string
 |    |    |-- value: string
 |-- nodes: array
 |    |-- element: struct
 |    |    |-- index: integer
 |    |    |-- nodeId: long

relation
 |-- id: long
 |-- version: integer
 |-- timestamp: long
 |-- changeset: long
 |-- uid: integer
 |-- user_sid: string
 |-- tags: array
 |    |-- element: struct
 |    |    |-- key: string
 |    |    |-- value: string
 |-- members: array
 |    |-- element: struct
 |    |    |-- id: long
 |    |    |-- role: string
 |    |    |-- type: string

Loading the data in Apache Spark now becomes extremely convenient:

val nodeDF = sqlContext.read.parquet("romania-latest.osm.pbf.node.parquet")
nodeDF.createOrReplaceTempView("nodes")

val wayDF = sqlContext.read.parquet("romania-latest.osm.pbf.way.parquet")
wayDF.createOrReplaceTempView("ways")

val relationDF = sqlContext.read.parquet("romania-latest.osm.pbf.relation.parquet")
relationDF.createOrReplaceTempView("relations")

From this point on, the Spark world opens and we could either play around with DataFrames or use the beloved SQL that we all know. Let us consider the following task:

For the most active OSM contributors, highlight the distribution of their work over time.

The DataFrames API solution looks like:

val nodeDF = nodeDF
    .withColumn("created_at", ($"timestamp" / 1000).cast(TimestampType))
    .createOrReplaceTempView("nodes")

val top10Users = nodeDF.groupBy("user_sid")
    .agg(count($"id").as("node_count"))
    .orderBy($"node_count".desc)
    .limit(10)
    .collect
    .map({ case Row(user_sid: String, _) => user_sid })

nodeDF.filter($"user_sid".in(top10Users: _*))
    .groupBy($"user_sid", year($"created_at").as("year"))
    .agg(count("id").as("node_count"))
    .orderBy($"year")
    .registerTempTable("top10UsersOverTime")

The Spark SQL solution looks like:

select 
    user_sid, 
    year(created_at)) as year,
    count(*) as node_count
from 
    nodes
where 
    user_sid in (
        select user_sid from (
            select 
                user_sid, 
                count(*) as c 
            from 
                nodes 
            group by 
                user_sid 
            order by 
                c desc 
            limit 10
        )
    )
group by 
    user_sid, 
    year(created_at)
order by 
    year

Both solutions are equivalent, and give the following results:

Even if we touched only a tiny piece of OSM, there is nothing to stop us from analyzing and getting valuable insights from it, in scalable way.

VIDEO: ISSUE 109 LAUNCH EVENT

Sponsors

  • Accenture
  • BT Code Crafters
  • Bosch
  • Betfair
  • MHP
  • BoatyardX
  • .msg systems
  • P3 group
  • Ing Hubs
  • Cognizant Softvision
  • Colors in projects

VIDEO: ISSUE 109 LAUNCH EVENT