Finding the right implementation for a job scheduler
I'm writing a job scheduler, like cron. One of the things the job scheduler needs to do is, each time it wakes up, look at each job's schedule and decide whether it is time to run it. Unlike the other parts of the job scheduler, this is a pure function, and is thus very testable.
The very first version had a simple way of specifying schedules – every=5m or start_at=10am, every=1d – and that code had all kinds of bugs.
The second version supported a few discrete classes of schedules:
- hourly (run every X minutes)
- daily (run at fixed times of day)
- weekly (run on fixed days of the week)
- monthly (run on fixed days of the month)
Each schedule type implements get_next_scheduled_time, which takes in the current time and returns the next scheduled time that is greater than the current time.
My first shot at writing the code looked something like:
@dataclass
class MonthlySchedule(BaseSchedule):
times_of_day: List[datetime.time]
days_of_month: List[int]
@override
def get_next_scheduled_time(self, now: datetime.datetime) -> datetime.datetime:
now_date = now.date()
days_in_month = timehelper.days_in_month(now_date)
next_time_of_day = find_first(self.times_of_day, lambda x: x > now.time())
if next_time_of_day is None:
next_time_of_day = self.times_of_day[0]
# Example: Job is scheduled to run on the 1st and 31st of every month. Today
# is Nov 30, after the last scheduled time. Without this condition, we'd just
# return Nov 30 again.
if now.day == days_in_month:
return datetime.datetime.combine(
timehelper.next_month(now_date).replace(day=self.days_of_month[0]),
next_time_of_day,
tzinfo=now.tzinfo,
)
next_day_of_month = find_first(
self.days_of_month, lambda x: x >= now.day + 1
)
else:
next_day_of_month = find_first(self.days_of_month, lambda x: x >= now.day)
if next_day_of_month is not None:
return combine(now, next_time_of_day).replace(
day=min(days_in_month, next_day_of_month)
)
else:
return datetime.datetime.combine(
timehelper.next_month(now_date), next_time_of_day, tzinfo=now.tzinfo
)
This was hard for me to write, hard for me to read, and hard for me to convince myself that it is correct.
The trouble here is that we are taking an absolute point in time and trying to directly calculate the next scheduled time and handle all the edge cases that arise. (What if it's the end of the day? What if it's the last day of the month? What if the month only has 28 days?)
A few observations:
- Every schedule divides the timeline into repeating intervals. Daily into days, weekly into weeks, etc.
- It is easy to map an absolute point in time to its interval.
- This isn't true for every possible schedule, for example "run every other day", but it is true of the schedules that my scheduler allows.
- It is easy to calculate the list of scheduled times for an interval.
- For any point in time, the next scheduled time lies either in the current interval or the next one.
Given this, a generic implementation of get_next_scheduled_time is simply:
- Calculate the current and next intervals and the list of scheduled times for each.
- Return the first time from the list that is greater than the current time.
This does a little bit of unnecessary work (e.g., if it's 11pm I don't really need to calculate that the schedule includes 12:05am, 12:10am, etc.) but it breaks the problem down into manageable pieces. Each piece is easy to verify, and it's easy to convince yourself that the algorithm as a whole is correct.
class BaseSchedule(ABC):
def get_next_scheduled_time(self, now: datetime.datetime) -> datetime.datetime:
tz = now.tzinfo
this_interval = self.get_interval(now)
next_interval = self.get_next_interval(now)
scheduled_times = self.get_times(this_interval, tz) + self.get_times(
next_interval, tz
)
r = find_first(scheduled_times, lambda x: x > now)
assert r is not None
return r
@dataclass
class MonthlySchedule(BaseSchedule):
times_of_day: List[datetime.time]
days_of_month: List[int]
@override
def get_interval(self, now: datetime.datetime) -> datetime.date:
return now.date().replace(day=1)
@override
def get_next_interval(self, now: datetime.datetime) -> datetime.date:
return timehelper.next_month(now.date())
@override
def get_times(
self, start_of_month: datetime.date, tzinfo: Tzinfo
) -> List[datetime.datetime]:
max_day = timehelper.days_in_month(start_of_month)
days_of_month = set(min(x, max_day) for x in self.days_of_month)
r: List[datetime.datetime] = []
for day in sorted(days_of_month):
date = start_of_month.replace(day=day)
for time_of_day in self.times_of_day:
r.append(combine(date, time_of_day, tzinfo))
return r
Indeed, when I rewrote the whole scheduler this way, it passed all of my tests on the first try.