door2door_data_challenge¶
My attempt at the hiring code challenge (data engineer) of door2door.
Part 1¶
The goal and subtasks¶
- Parse the files in
resources
. Each line of each file is a JSON string describing an event. - Extract all events and store them.
- The storage format should be so that it is easy to answer the question: What is the average distance traveled by our vehicles during an operating period?
Choosing the technology and technique¶
I am working in python. The approach I decided upon is:
- Reading the events using json module (part of standard library).
- Do the data modelling using sqlalchemy.
- Store them in an SQL database for the processed data.
The reasons¶
- Ease of further analysis: Pandas is my data analysis tool of choice in python. It can easily read from a SQL database.
- Flexible: Without any code change, one can choose between many relational databases: sqlite for prototyping and testing, PostgreSQL/MySQL/etc for production.
- Scalable: Processing potentially large volumes of data this way should be limited only by the database’s capabilities.
- Robust: Both
json
andsqlalchemy
are well-supported and time-tested libraries.
Testing this solution in Binder (no setup required)¶
Follow this link. After a few seconds, you will get an interactive Jupyter notebook demonstrating how to use this solution to compute the average distance traveled by the vehicles during an operating period. If you would like to try out this solution in a local machine, please see the section “Using this solution locally”.
Part 2¶
We need to “adapt the events in the Data Model so that during the analysis, the location updates during break times are not considered when the average distance traveled is computed”.
Adding new events¶
I would suggest adding two new events on vehicle
:
break_begin
,break_end
.
Examples:
{
"event": "break_begin",
"on": "vehicle",
"at": "2019-05-19T16:02:02Z",
"data":
{
"id": "s0m3-v3h1cl3-1D",
"break_id": "b43ak-1D", # an unique id for each break
"location":
{
"lat": 50.1,
"lng": 40.2,
"at": "2016-02-02T16:02:00Z"
},
},
"organization_id": "id-of-organization",
},
{
"event": "break_end",
"on": "vehicle",
"at": "2019-05-19T16:22:02Z",
"data":
{
"id": "s0m3-v3h1cl3-1D",
"break_id": "b43ak-1D", # an unique id for each break
"location":
{
"lat": 51.4,
"lng": 40.0,
"at": "2016-02-02T16:22:00Z"
},
},
"organization_id": "id-of-organization",
}
Modelling Break objects¶
Then we can store the break objects by adding this class in the data_structures module:
class VehicleBreak(Base):
__tablename__ = "break"
id = Column(String, primary_key=True, index=True)
# foreignkey on vehicle
vehicle_id = Column(
String(100), ForeignKey("vehicle.id"), nullable=False, index=True
)
start = Column(DateTime(timezone=False), nullable=False)
finish = Column(DateTime(timezone=False), nullable=False)
organization_id = Column(String(100), nullable=False)
Then while computing the average distance travelled, we can ignore the location
updates made during breaks. Here I outline one possible way of doing this computation.
We can modify the function get_single_vehicle_trajectory_during_operating_period
in notebooks/calculate_distance_travelled.ipynb
(also in Binder)
in the following way (the code is not complete, just an outline):
def single_vehicle_travel_distance_during_operating_period(op_period_id, vehicle_id, sql_engine):
op = # <OperatingPeriod with id=op_period_id>
# Get all breaks of vehicle_id that overlaps with the time interval (op.start, op.finish)
breaks = # <fetch all breaks ...>
# Iterate through the breaks. Get the trajectory segment only upto the
# start of this break.
t_lower = op.start
distance_travelled_outside_breaks = 0
for this_break in breaks:
t_upper = min(this_break.start, op.finish)
trajectory_segment_before_this_break_began \
= # <filter VehicleLocation objs with gps_time between t_lower & t_upper>
# compute kilometers travelled during this trajectory segment
# add it to distance_travelled_outside_breaks
distance_travelled_outside_breaks += travel_distance_from_trajectory(
trajectory_segment_before_this_break_began)
t_lower = this_break.finish
# no need to iterate further if operating period is over
if t_lower > op.finish:
break
return distance_travelled_outside_breaks
Using this solution locally¶
Clone the repository¶
Install the package (and its prerequisites)¶
cd door2door_data_challenge
pip install .
pip install [doc] # only if you want to build the docs using sphinx.
pip install [dev] # installs jupyter and pandas. not needed for the data processing and storage.
pip install [testing] # only if you want to run the test suite.
Process the data¶
- Choose a SQL database that
sqlalchemy
is compatible with. - Create a EventParser object,
supplying it the connection string that
sqlalchemy
can use to connect to your DB; and the path to where the data is stored.
Example code:
from door2door_data_challenge import EventParser
# tell where the data is (Contains directories `2018`, `2019` etc.).
data_root = 'd2d-code-challenges/data/resources/'
# initialize an EventParser object that will store the events to an SQL DB
ep = EventParser(data_root, sql_connection_string='sqlite:///test.sqlite')
# parse all the events, and store to test.sqlite
ep.parse_all()
Details of the solution¶
This solution consists of a python package door2door_data_challenge
.
The package consists of:
- The data models in the data_structures module.
- The event parsing logic in the event_parsing module.
Here is the full documentation.
Tests and docs¶
Building the docs locally¶
pip install .[doc]
cd docs
make html
Then the html docs will be generated in docs/build/
. The docs are also autogenerated
on each commit to master
and uploaded at Read the docs.
Detailed documentation¶
Module event_parsing
¶
This module contains the EventParser
class, which does the actual job
of reading the events from the data source and writes them to a SQL database.
-
class
EventParser
(data_root: str, sql_connection_string: str, engine_creation_kwargs=None)[source]¶ Parses the events stored in the format described in https://github.com/door2door-io/d2d-code-challenges/tree/master/data, and stores them in a SQL database.
Parameters: - data_root (string) – Path to the directory under which the raw data is stored.
- sql_connection_string (string) – Connection string for the SQL DB. The format needs to be understood by sqlalchemy.create_engine
- engine_creation_kwargs (dict) – kwargs needed to be passed to sqlalchemy.create_engine along with the sql_connection_string
Examples
Parse data in
resources/
and store in an in-memory sqlite database (resources/
must have subdirectory structure specified in the docstring ofiterate_chronologically_over_data_files()
):$ ev_parser = EventParser(data_root='resources/', sql_connection_string='sqlite://') $ ev_parser.parse_all()
-
parse_all
() → None[source]¶ Parses all data files in self.data_root, processes and stores the events in the SQL DB.
-
parse_single_file
(filepath: str) → None[source]¶ Parses a single data file, processes and stores the events.
Parameters: filepath – Path to file to parse
-
iterate_chronologically_over_data_files
() → str[source]¶ Iterates over all the files in self.data_root in chronological order: files containing events that occured earlier are yielded earlier.
Returns: yielding file paths in the right order. Return type: iterator Note
Here we assume the directory structure is like the following:
DATA_ROOT/ 2019/ <- year 06/ <- month 12/ <- date 16/ <- hour 17 <- events occurring in the 17th minute of hour 2019-06-01-15-17-1-events.txt <- events occurring at 15:17:01 2019-06-01-15-17-11-events.txt 2019-06-01-15-17-2-events.txt ... ... 18 2019-06-01-15-18-1-events.txt 2019-06-01-15-18-11-events.txt 2019-06-01-15-18-2-events.txt ... ...
-
process_event
(event: dict, session: sqlalchemy.orm.session.Session) → None[source]¶ Parses a single JSON encoded event, and stores it to the SQL DB.
Parameters: - event – Parsed JSON, format specified in https://github.com/door2door-io/d2d-code-challenges/tree/master/data#data-model
- session – The session used to store the events to DB.
-
create_operating_period
(session: sqlalchemy.orm.session.Session, event: dict) → None[source]¶ Creates a new
data_structures.OperatingPeriod
object, inserts into the SQL DB.Args: Same as in
process_event()
.Note
We assume that each operating period has an unique id.
-
delete_operating_period
(session: sqlalchemy.orm.session.Session, event: dict) → None[source]¶ Marks an existing operating period (
data_structures.OperatingPeriod
) as deleted by setting the deleted_at attribute, updates the SQL DB.Args: Same as in
process_event()
.Note
If the operating period has not been created yet, this event is ignored. Also, we do not check if the “organization_id”, “start” and “finish” fields of this events match the values provided when this operating period was created.
-
register_vehicle
(session: sqlalchemy.orm.session.Session, event: dict) → None[source]¶ Adds a new
data_structures.Vehicle
object to the SQL DB. If the location data is available in event, also callsupdate_vehicle_location()
.Args: Same as in
process_event()
.Note
We assume that a vehicle in registered only once, i.e. the ‘id’ field of all vehicle registration events are unique.
-
deregister_vehicle
(session: sqlalchemy.orm.session.Session, event: dict) → None[source]¶ Marks a vehicle (
data_structures.Vehicle
) as deregistered by setting the deregistered_at attribute, updates the SQL DB. If location data is present in the event, also callsupdate_vehicle_location()
.Args: Same as in
process_event()
.Note
If the vehicle to be deregistered has not been registered yet, this event will be ignored.
-
update_vehicle_location
(session: sqlalchemy.orm.session.Session, event: dict) → None[source]¶ Adds a new
data_structures.VehicleLocation
object to the SQL DB corresponding to the GPS update event.Args: Same as in
process_event()
.Note
If the location field contains null values in any of the fields lat, lng and at, we do not add a new VehicleLocation object.
Module data_structures
¶
This module contains the Object-relational mappings we need for storing the events we read as rows in a SQL database.
-
class
Vehicle
(**kwargs)[source]¶ Stores vehicles. The columns are:
-
id
¶ Id of the vehicle. Primary key, indexed.
Type: str
-
organization_id
¶ Id of the organization the vehicle belongs to.
Type: str
-
registered_at
¶ The UTC time this vehicle was registered.
Type: datetime
-
deregistered_at
¶ The UTC time this vehicle was deregistered (null if not deregistered).
Type: datetime
-
-
class
VehicleLocation
(**kwargs)[source]¶ Stores a location update (via GPS sensor) of a vehicle at an instance of time. The columns are:
-
id
¶ An unique autogenerated id. Primary key.
Type: int
-
vehicle_id
¶ Id of the vehicle. Indexed.
Type: str
-
lat, lng
The GPS lat/lng.
Type: float
-
gps_time
¶ The UTC time when the GPS location update was emitted. Indexed.
Type: datetime
-
event_time
¶ The UTC time when the location update event was received in the system.
Type: datetime
Note
lat, lng and at attributes are all not nullable. If a GPS update has any of these fields missing, please ignore that event: Do not attempt to create a VehicleLocation object with null values.
-
-
class
OperatingPeriod
(**kwargs)[source]¶ Stores operating periods, which are specified by the organizations. The columns are:
-
id
¶ Unique id. Primary key, indexed.
Type: str
-
start, finish
The start and end of the operating period.
Type: datetime
-
organization_id
¶ id of the organization that registered this operating period.
Type: str
-
created_at
¶ The UTC time when this operating period was created.
Type: datetime
-
deleted_at
¶ The UTC time when this operating period was deleted (null if not deleted).
Type: datetime
-