Owen Rumney


Software Engineer


Following on from part 1 of this mini series. I’ve got my local environment ready to go and I have pulled down some test data to work with.

The next step is to start having a look at some of the data.

Loading in the data

We know that the data is in csv format so can use the Spark read functionality to bring it in. With the single file in this local environment it’s a case of;

data = spark.read.csv('01aJourneyDataExtract10Jan16-23Jan16.csv', header=True, inferSchema=True)
for col in data.columns:
    data = data.withColumnRenamed( col, col.replace(" ", ""))

This line will create a DataFrame called data and load the csv input into it. By setting header to True we are saying that the first row of the data is a header row.

inferSchema will ask Spark to have a go at working out the correct types for the columns that are being brought in.

Quick Cleanup

Even though inferSchema was used, if we call data.describe() we can see that the type of the dates is string. I’m going to put that down to the fact that these dates are in UK format.

DataFrame[description: string
, RentalId: string
, Duration: string
, BikeId: string
, EndDate: string
, EndStationId: string
, EndStationName: string
, StartDate: string
, StartStationId: string
, StartStationName: string]

I think I’m going to want these to be dates later on, so I’m going to convert them to timestamps now.

from pyspark.sql.functions import unix_timestamp
dated_data = data.select('RentalId' \
           ,unix_timestamp('StartDate', 'dd/MM/yyyy HH:mm').cast("double").cast("timestamp").alias('StartDate') \
           ,unix_timestamp('EndDate', 'dd/MM/yyyy HH:mm').cast("double").cast("timestamp").alias('EndDate') \
           ,'Duration' \
           ,'StartStationId' \
           ,'EndStationId')

This block uses the unix_timestamp function to get the long number representation of the date which we can then turn into the timstamp type. By passing the format of the date we can solve the issue of it being in a format that the inferSchema wasn’t expecting. I’ve used .alias() to specify the name of the derived column.

Getting the StationId Data

There is an API which we can use to get the additional data for the StartStation_Id and EndStation_Id. This can be found here on the TfL website.

We need a list of all the start and end bike point/stations that are in the dataset so I went for doing a union to get this.

stationids = sorted((data.select('StartStationId') \
                    .union(data.select('EndStationId'))) \
                    .rdd.map(lambda r: r[0]) \
                    .distinct() \
                    .collect())

This will return us a sorted list of all the Ids in the dataset which can be passed into a helper method which will call into the API mentioned about.

def get_bike_points(points_ids):
    bike_point_file = '~/datasets/cycling/bike_points.csv'
    base_url = 'https://api.tfl.gov.uk/BikePoint/BikePoints_'

    with open(bike_point_file, 'w') as csvfile:
        fieldnames = ['pointId', 'commonName', 'lat', 'lon', 'placeType', 'properties']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        for point in points_ids:
            if point == None:
                continue
            url = "%s%s" % (base_url, point)
            req = requests.get(url)
            if req.status_code != 200:
                writer.writerow({"pointId": point, "commonName": "Not Found"})
            else:
                bike_point = json.loads(req.text)
                props = {}
                if bike_point.has_key('additionalProperties'):
                    for p in bike_point['additionalProperties']:
                        props[p['key']] = p['value']
                writer.writerow({"pointId": point, "commonName": bike_point['commonName'], "lat": bike_point['lat'], \
                                "lon": bike_point['lon'], "placeType": bike_point['placeType'], 'properties': props})
        csvfile.flush

This block takes the list of Id and collects the data for the bike station, extracts what is wanted from the returned dataset then saves it into a csv file.

Cleaning up the StationId data

Some of the stations in the dataset aren’t there anymore so we get a 404 when we hit the page. To get round this I’ve just created a line for the ID with a common name of not found.

That said, we do have this information in the original data set, so a bit of fiddling can be used to update the bike_points data with the correct commonName.

bike_points = spark.read.csv('bike_points.csv', header=True, inferSchema=True)

combined_bike_points = bike_points.where(bike_points.commonName == "Not Found") \
                      .join(data, data.StartStationId == bike_points.pointId)\
                      .select(bike_points.pointId \
                            , data.StartStationName.alias("commonName") \
                            , bike_points.lat \
                            , bike_points.lon \
                            , bike_points.placeType \
                            , bike_points.properties) \
                      .distinct()

bike_points = combined_bike_points \
              .union(bike_points \
                     .where(bike_points.commonName <> "Not Found"))

Okay, long winded but we now have the station data to work with too.