OpenStreetMap is an open/collaborative map considered the Wikipedia of maps. There are tons of information available online and even books about it... Anyway, it is not the history of OSM that we will discuss here. We will rather take a look at the impressive dataset behind and how such a big map can be analysed with modern technologies as Apache Spark.
One of the reasons OSM got where it is now, is that it's quite a simple data model. There are only three types of entities:
nodes: a geo-localized point on the map
ways: roads, building contours, borders (collections of nodes), ...
Each entity can be tagged (for example, a way tagged with the highway=residential marks that the way is a road and the road type is residential).
In addition, meta information about who and when each entity was added is kept.
The data is available at http://planet.openstreetmap.org and it comes in two formats:
XML \~ around 53 GB
The OSMstats project highlights how the map evolves on a daily basis.
Getting started with OpenStreetMap at large scale (the entire planet) can be painful. A few years ago, we were a bit intrigued to see people waiting hours or even days to get a piece of OSM imported in PostgreSQL on huge machines. But we said OK ... this is not Big Data.
Meanwhile, we started to work on various geo-spatial analyses involving technologies from a Big Data stack, where OSM was used and we were again intrigued, as the regular way to handle the OSM data was to run osmosis over the huge PBF planet file and to dump some CSV files for various scenarios. Even if this works, it's sub-optimal, and so we wrote an OSM converter to a big data friendly columnar format called Parquet.
The converter is available at github.com/adrianulbona/osm-parquetizer.
Hopefully, this will make the valuable work of so many OSM contributors easily available for the Big Data world.
It is less than a minute for romania-latest.osm.pbf and \~3 hours (on a decent laptop with SSD) for the planet-latest.osm.pbf.
The converter mentioned above takes one file, and not only does it convert the data, but it also splits it in three files, one for each OSM entity type - each file basically represents a collection of structured data (a table). The schemas of the tables are the following:
node
|-- id: long
|-- version: integer
|-- timestamp: long
|-- changeset: long
|-- uid: integer
|-- user_sid: string
|-- tags: array
| |-- element: struct
| | |-- key: string
| | |-- value: string
|-- latitude: double
|-- longitude: double
way
|-- id: long
|-- version: integer
|-- timestamp: long
|-- changeset: long
|-- uid: integer
|-- user_sid: string
|-- tags: array
| |-- element: struct
| | |-- key: string
| | |-- value: string
|-- nodes: array
| |-- element: struct
| | |-- index: integer
| | |-- nodeId: long
relation
|-- id: long
|-- version: integer
|-- timestamp: long
|-- changeset: long
|-- uid: integer
|-- user_sid: string
|-- tags: array
| |-- element: struct
| | |-- key: string
| | |-- value: string
|-- members: array
| |-- element: struct
| | |-- id: long
| | |-- role: string
| | |-- type: string
Loading the data in Apache Spark now becomes extremely convenient:
val nodeDF = sqlContext.read.parquet("romania-latest.osm.pbf.node.parquet")
nodeDF.createOrReplaceTempView("nodes")
val wayDF = sqlContext.read.parquet("romania-latest.osm.pbf.way.parquet")
wayDF.createOrReplaceTempView("ways")
val relationDF = sqlContext.read.parquet("romania-latest.osm.pbf.relation.parquet")
relationDF.createOrReplaceTempView("relations")
From this point on, the Spark world opens and we could either play around with DataFrames or use the beloved SQL that we all know. Let us consider the following task:
For the most active OSM contributors, highlight the distribution of their work over time.
The DataFrames API solution looks like:
val nodeDF = nodeDF
.withColumn("created_at", ($"timestamp" / 1000).cast(TimestampType))
.createOrReplaceTempView("nodes")
val top10Users = nodeDF.groupBy("user_sid")
.agg(count($"id").as("node_count"))
.orderBy($"node_count".desc)
.limit(10)
.collect
.map({ case Row(user_sid: String, _) => user_sid })
nodeDF.filter($"user_sid".in(top10Users: _*))
.groupBy($"user_sid", year($"created_at").as("year"))
.agg(count("id").as("node_count"))
.orderBy($"year")
.registerTempTable("top10UsersOverTime")
The Spark SQL solution looks like:
select
user_sid,
year(created_at)) as year,
count(*) as node_count
from
nodes
where
user_sid in (
select user_sid from (
select
user_sid,
count(*) as c
from
nodes
group by
user_sid
order by
c desc
limit 10
)
)
group by
user_sid,
year(created_at)
order by
year
Both solutions are equivalent, and give the following results:
Even if we touched only a tiny piece of OSM, there is nothing to stop us from analyzing and getting valuable insights from it, in scalable way.
by Simona Pop
by Dan Sabadis