door2door_data_challenge

Build Status Doc status Binder

My attempt at the hiring code challenge (data engineer) of door2door.

Part 1

The goal and subtasks

  1. Parse the files in resources. Each line of each file is a JSON string describing an event.
  2. Extract all events and store them.
  3. The storage format should be so that it is easy to answer the question: What is the average distance traveled by our vehicles during an operating period?

Choosing the technology and technique

I am working in python. The approach I decided upon is:

  1. Reading the events using json module (part of standard library).
  2. Do the data modelling using sqlalchemy.
  3. Store them in an SQL database for the processed data.

The reasons

  • Ease of further analysis: Pandas is my data analysis tool of choice in python. It can easily read from a SQL database.
  • Flexible: Without any code change, one can choose between many relational databases: sqlite for prototyping and testing, PostgreSQL/MySQL/etc for production.
  • Scalable: Processing potentially large volumes of data this way should be limited only by the database’s capabilities.
  • Robust: Both json and sqlalchemy are well-supported and time-tested libraries.

Testing this solution in Binder (no setup required)

Follow this link. After a few seconds, you will get an interactive Jupyter notebook demonstrating how to use this solution to compute the average distance traveled by the vehicles during an operating period. If you would like to try out this solution in a local machine, please see the section “Using this solution locally”.

Part 2

We need to “adapt the events in the Data Model so that during the analysis, the location updates during break times are not considered when the average distance traveled is computed”.

Adding new events

I would suggest adding two new events on vehicle:

  1. break_begin,
  2. break_end.

Examples:

{
  "event": "break_begin",
  "on": "vehicle",
  "at": "2019-05-19T16:02:02Z",
  "data":
   {
      "id":            "s0m3-v3h1cl3-1D",
      "break_id":      "b43ak-1D", # an unique id for each break
      "location":
      {
         "lat":              50.1,
         "lng":              40.2,
         "at":               "2016-02-02T16:02:00Z"
      },
   },
  "organization_id": "id-of-organization",
},
{
  "event": "break_end",
  "on": "vehicle",
  "at": "2019-05-19T16:22:02Z",
  "data":
   {
      "id":            "s0m3-v3h1cl3-1D",
      "break_id":      "b43ak-1D", # an unique id for each break
      "location":
      {
         "lat":              51.4,
         "lng":              40.0,
         "at":               "2016-02-02T16:22:00Z"
      },
   },
  "organization_id": "id-of-organization",
}

Modelling Break objects

Then we can store the break objects by adding this class in the data_structures module:

class VehicleBreak(Base):
    __tablename__ = "break"
    id = Column(String, primary_key=True, index=True)
    # foreignkey on vehicle
    vehicle_id = Column(
        String(100), ForeignKey("vehicle.id"), nullable=False, index=True
    )
    start = Column(DateTime(timezone=False), nullable=False)
    finish = Column(DateTime(timezone=False), nullable=False)
    organization_id = Column(String(100), nullable=False)

Then while computing the average distance travelled, we can ignore the location updates made during breaks. Here I outline one possible way of doing this computation. We can modify the function get_single_vehicle_trajectory_during_operating_period in notebooks/calculate_distance_travelled.ipynb (also in Binder) in the following way (the code is not complete, just an outline):

def single_vehicle_travel_distance_during_operating_period(op_period_id, vehicle_id, sql_engine):
    op = # <OperatingPeriod with id=op_period_id>
    # Get all breaks of vehicle_id that overlaps with the time interval (op.start, op.finish)
    breaks = # <fetch all breaks ...>
    # Iterate through the breaks. Get the trajectory segment only upto the
    # start of this break.
    t_lower = op.start
    distance_travelled_outside_breaks = 0
    for this_break in breaks:
        t_upper = min(this_break.start, op.finish)
        trajectory_segment_before_this_break_began \
            = # <filter VehicleLocation objs with gps_time between t_lower  & t_upper>
        # compute kilometers travelled during this trajectory segment
        # add it to  distance_travelled_outside_breaks
        distance_travelled_outside_breaks += travel_distance_from_trajectory(
            trajectory_segment_before_this_break_began)
        t_lower = this_break.finish
        # no need to iterate further if operating period is over
        if t_lower > op.finish:
            break
    return distance_travelled_outside_breaks

Using this solution locally

Clone the repository

Install the package (and its prerequisites)

cd door2door_data_challenge
pip install .
pip install [doc] # only if you want to build the docs using sphinx.
pip install [dev] # installs jupyter and pandas. not needed for the data processing and storage.
pip install [testing] # only if you want to run the test suite.

Process the data

  1. Choose a SQL database that sqlalchemy is compatible with.
  2. Create a EventParser object, supplying it the connection string that sqlalchemy can use to connect to your DB; and the path to where the data is stored.

Example code:

from door2door_data_challenge import EventParser
# tell where the data is (Contains directories `2018`, `2019` etc.).
data_root = 'd2d-code-challenges/data/resources/'
# initialize an EventParser object that will store the events to an SQL DB
ep = EventParser(data_root, sql_connection_string='sqlite:///test.sqlite')
# parse all the events, and store to test.sqlite
ep.parse_all()

Details of the solution

This solution consists of a python package door2door_data_challenge. The package consists of:

  1. The data models in the data_structures module.
  2. The event parsing logic in the event_parsing module.

Here is the full documentation.

Tests and docs

Building the docs locally

pip install .[doc]
cd docs
make html

Then the html docs will be generated in docs/build/. The docs are also autogenerated on each commit to master and uploaded at Read the docs.

Running the test suite

From the root of the repository, run:

pip install .[testing]
pytest -v

On each commit, the testsuite is automatically run by Travis CI.

Detailed documentation

Module event_parsing

This module contains the EventParser class, which does the actual job of reading the events from the data source and writes them to a SQL database.

class EventParser(data_root: str, sql_connection_string: str, engine_creation_kwargs=None)[source]

Parses the events stored in the format described in https://github.com/door2door-io/d2d-code-challenges/tree/master/data, and stores them in a SQL database.

Parameters:
  • data_root (string) – Path to the directory under which the raw data is stored.
  • sql_connection_string (string) – Connection string for the SQL DB. The format needs to be understood by sqlalchemy.create_engine
  • engine_creation_kwargs (dict) – kwargs needed to be passed to sqlalchemy.create_engine along with the sql_connection_string

Examples

Parse data in resources/ and store in an in-memory sqlite database (resources/ must have subdirectory structure specified in the docstring of iterate_chronologically_over_data_files()):

$ ev_parser =  EventParser(data_root='resources/', sql_connection_string='sqlite://')
$ ev_parser.parse_all()
parse_all() → None[source]

Parses all data files in self.data_root, processes and stores the events in the SQL DB.

parse_single_file(filepath: str) → None[source]

Parses a single data file, processes and stores the events.

Parameters:filepath – Path to file to parse
iterate_chronologically_over_data_files() → str[source]

Iterates over all the files in self.data_root in chronological order: files containing events that occured earlier are yielded earlier.

Returns:yielding file paths in the right order.
Return type:iterator

Note

Here we assume the directory structure is like the following:

DATA_ROOT/
  2019/      <- year
    06/      <- month
      12/    <- date
        16/  <- hour
          17 <- events occurring in the 17th minute of hour
             2019-06-01-15-17-1-events.txt <- events occurring at 15:17:01
             2019-06-01-15-17-11-events.txt
             2019-06-01-15-17-2-events.txt
             ...
          ...
          18
             2019-06-01-15-18-1-events.txt
             2019-06-01-15-18-11-events.txt
             2019-06-01-15-18-2-events.txt
             ...
    ...
process_event(event: dict, session: sqlalchemy.orm.session.Session) → None[source]

Parses a single JSON encoded event, and stores it to the SQL DB.

Parameters:
create_operating_period(session: sqlalchemy.orm.session.Session, event: dict) → None[source]

Creates a new data_structures.OperatingPeriod object, inserts into the SQL DB.

Args: Same as in process_event().

Note

We assume that each operating period has an unique id.

delete_operating_period(session: sqlalchemy.orm.session.Session, event: dict) → None[source]

Marks an existing operating period (data_structures.OperatingPeriod) as deleted by setting the deleted_at attribute, updates the SQL DB.

Args: Same as in process_event().

Note

If the operating period has not been created yet, this event is ignored. Also, we do not check if the “organization_id”, “start” and “finish” fields of this events match the values provided when this operating period was created.

register_vehicle(session: sqlalchemy.orm.session.Session, event: dict) → None[source]

Adds a new data_structures.Vehicle object to the SQL DB. If the location data is available in event, also calls update_vehicle_location().

Args: Same as in process_event().

Note

We assume that a vehicle in registered only once, i.e. the ‘id’ field of all vehicle registration events are unique.

deregister_vehicle(session: sqlalchemy.orm.session.Session, event: dict) → None[source]

Marks a vehicle (data_structures.Vehicle) as deregistered by setting the deregistered_at attribute, updates the SQL DB. If location data is present in the event, also calls update_vehicle_location().

Args: Same as in process_event().

Note

If the vehicle to be deregistered has not been registered yet, this event will be ignored.

update_vehicle_location(session: sqlalchemy.orm.session.Session, event: dict) → None[source]

Adds a new data_structures.VehicleLocation object to the SQL DB corresponding to the GPS update event.

Args: Same as in process_event().

Note

If the location field contains null values in any of the fields lat, lng and at, we do not add a new VehicleLocation object.

Module data_structures

This module contains the Object-relational mappings we need for storing the events we read as rows in a SQL database.

class Vehicle(**kwargs)[source]

Stores vehicles. The columns are:

id

Id of the vehicle. Primary key, indexed.

Type:str
organization_id

Id of the organization the vehicle belongs to.

Type:str
registered_at

The UTC time this vehicle was registered.

Type:datetime
deregistered_at

The UTC time this vehicle was deregistered (null if not deregistered).

Type:datetime
class VehicleLocation(**kwargs)[source]

Stores a location update (via GPS sensor) of a vehicle at an instance of time. The columns are:

id

An unique autogenerated id. Primary key.

Type:int
vehicle_id

Id of the vehicle. Indexed.

Type:str
lat, lng

The GPS lat/lng.

Type:float
gps_time

The UTC time when the GPS location update was emitted. Indexed.

Type:datetime
event_time

The UTC time when the location update event was received in the system.

Type:datetime

Note

lat, lng and at attributes are all not nullable. If a GPS update has any of these fields missing, please ignore that event: Do not attempt to create a VehicleLocation object with null values.

class OperatingPeriod(**kwargs)[source]

Stores operating periods, which are specified by the organizations. The columns are:

id

Unique id. Primary key, indexed.

Type:str
start, finish

The start and end of the operating period.

Type:datetime
organization_id

id of the organization that registered this operating period.

Type:str
created_at

The UTC time when this operating period was created.

Type:datetime
deleted_at

The UTC time when this operating period was deleted (null if not deleted).

Type:datetime

Indices and tables