UMBRA/backend/app/services/recurrence.py
Kyle Pope 3b63d18f63 Fix first occurrence missing from recurring events
The parent template is hidden from the calendar listing, but the
recurrence service was only generating children starting from the
second occurrence. Now generates a child for the parent's own start
date so the first occurrence is always visible.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 01:55:50 +08:00

182 lines
6.4 KiB
Python

"""
Recurrence service — generates materialized child CalendarEvent ORM objects
from a parent event's recurrence_rule JSON string.
All datetimes are naive (no timezone) to match TIMESTAMP WITHOUT TIME ZONE in DB.
"""
import json
from datetime import datetime, timedelta
from typing import Optional
from app.models.calendar_event import CalendarEvent
def _nth_weekday_of_month(year: int, month: int, weekday: int, week: int) -> Optional[datetime]:
"""
Return the date of the Nth (1-4) occurrence of a given weekday (0=Mon…6=Sun)
in a month, or None if it doesn't exist (e.g. 5th Monday in a short month).
"""
# Find the first occurrence of the weekday in the month
first_day = datetime(year, month, 1)
# days_ahead: how many days until the target weekday
days_ahead = (weekday - first_day.weekday()) % 7
first_occurrence = first_day + timedelta(days=days_ahead)
# Advance by (week - 1) full weeks
target = first_occurrence + timedelta(weeks=week - 1)
if target.month != month:
return None
return target
def generate_occurrences(
parent: CalendarEvent,
horizon_days: int = 365,
) -> list[CalendarEvent]:
"""
Parse parent.recurrence_rule (JSON string) and materialise child CalendarEvent
objects up to horizon_days from parent.start_datetime.
Supported rule types:
- every_n_days : {"type": "every_n_days", "interval": N}
- weekly : {"type": "weekly", "weekday": 0-6} (0=Mon, 6=Sun)
- monthly_nth_weekday: {"type": "monthly_nth_weekday", "week": 1-4, "weekday": 0-6}
- monthly_date : {"type": "monthly_date", "day": 1-31}
Returns unsaved ORM objects — the caller must add them to the session.
"""
if not parent.recurrence_rule:
return []
try:
rule: dict = json.loads(parent.recurrence_rule)
except (json.JSONDecodeError, TypeError):
return []
rule_type: str = rule.get("type", "")
parent_start: datetime = parent.start_datetime
parent_end: datetime = parent.end_datetime
duration: timedelta = parent_end - parent_start
horizon: datetime = parent_start + timedelta(days=horizon_days)
# Fields to copy verbatim from parent to each child
shared_fields = dict(
title=parent.title,
description=parent.description,
all_day=parent.all_day,
color=parent.color,
location_id=parent.location_id,
is_starred=parent.is_starred,
calendar_id=parent.calendar_id,
# Children do not carry the recurrence_rule; parent is the template
recurrence_rule=None,
parent_event_id=parent.id,
is_recurring=True,
)
occurrences: list[CalendarEvent] = []
def _make_child(occ_start: datetime) -> CalendarEvent:
occ_end = occ_start + duration
return CalendarEvent(
**shared_fields,
start_datetime=occ_start,
end_datetime=occ_end,
original_start=occ_start,
)
# Always generate a child for the parent's own start date (the first occurrence).
# The parent template is hidden from list views, so without this the first date
# would have no visible event.
occurrences.append(_make_child(parent_start))
if rule_type == "every_n_days":
interval: int = int(rule.get("interval") or 1)
if interval < 1:
interval = 1
current = parent_start + timedelta(days=interval)
while current < horizon:
occurrences.append(_make_child(current))
current += timedelta(days=interval)
elif rule_type == "weekly":
weekday: int = int(rule.get("weekday") if rule.get("weekday") is not None else parent_start.weekday())
# Start from the next week after the parent
days_ahead = (weekday - parent_start.weekday()) % 7
if days_ahead == 0:
days_ahead = 7
current = parent_start + timedelta(days=days_ahead)
while current < horizon:
occurrences.append(_make_child(current))
current += timedelta(weeks=1)
elif rule_type == "monthly_nth_weekday":
week: int = int(rule.get("week") or 1)
weekday = int(rule.get("weekday") if rule.get("weekday") is not None else parent_start.weekday())
# Advance month by month
year, month = parent_start.year, parent_start.month
# Move to the next month from the parent
month += 1
if month > 12:
month = 1
year += 1
while True:
target = _nth_weekday_of_month(year, month, weekday, week)
if target is None:
# Skip months where Nth weekday doesn't exist
pass
else:
# Preserve the time-of-day from the parent
occ_start = target.replace(
hour=parent_start.hour,
minute=parent_start.minute,
second=parent_start.second,
microsecond=0,
)
if occ_start >= horizon:
break
occurrences.append(_make_child(occ_start))
month += 1
if month > 12:
month = 1
year += 1
# Safety: if we've passed the horizon year by 2, stop
if year > horizon.year + 1:
break
elif rule_type == "monthly_date":
day: int = int(rule.get("day") or parent_start.day)
year, month = parent_start.year, parent_start.month
month += 1
if month > 12:
month = 1
year += 1
while True:
# Some months don't have day 29-31
try:
occ_start = datetime(
year, month, day,
parent_start.hour,
parent_start.minute,
parent_start.second,
)
except ValueError:
# Day out of range for this month — skip
month += 1
if month > 12:
month = 1
year += 1
if year > horizon.year + 1:
break
continue
if occ_start >= horizon:
break
occurrences.append(_make_child(occ_start))
month += 1
if month > 12:
month = 1
year += 1
if year > horizon.year + 1:
break
return occurrences