UMBRA/backend/app/services/recurrence.py
Kyle Pope e22cad1d86 Fix QA findings: firstDay reactivity, state revert, helper extraction
- W1: Add key prop to FullCalendar so firstDay change triggers remount
- W2: Revert firstDayOfWeek toggle state on API failure
- S1: Extract _rule_int helper in recurrence service to reduce duplication

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 02:02:15 +08:00

188 lines
6.5 KiB
Python

"""
Recurrence service — generates materialized child CalendarEvent ORM objects
from a parent event's recurrence_rule JSON string.
All datetimes are naive (no timezone) to match TIMESTAMP WITHOUT TIME ZONE in DB.
"""
import json
from datetime import datetime, timedelta
from typing import Optional
from app.models.calendar_event import CalendarEvent
def _nth_weekday_of_month(year: int, month: int, weekday: int, week: int) -> Optional[datetime]:
"""
Return the date of the Nth (1-4) occurrence of a given weekday (0=Mon…6=Sun)
in a month, or None if it doesn't exist (e.g. 5th Monday in a short month).
"""
# Find the first occurrence of the weekday in the month
first_day = datetime(year, month, 1)
# days_ahead: how many days until the target weekday
days_ahead = (weekday - first_day.weekday()) % 7
first_occurrence = first_day + timedelta(days=days_ahead)
# Advance by (week - 1) full weeks
target = first_occurrence + timedelta(weeks=week - 1)
if target.month != month:
return None
return target
def _rule_int(rule: dict, key: str, default: int) -> int:
"""Get an int from the rule dict, falling back to default if missing or None."""
val = rule.get(key)
return int(val) if val is not None else default
def generate_occurrences(
parent: CalendarEvent,
horizon_days: int = 365,
) -> list[CalendarEvent]:
"""
Parse parent.recurrence_rule (JSON string) and materialise child CalendarEvent
objects up to horizon_days from parent.start_datetime.
Supported rule types:
- every_n_days : {"type": "every_n_days", "interval": N}
- weekly : {"type": "weekly", "weekday": 0-6} (0=Mon, 6=Sun)
- monthly_nth_weekday: {"type": "monthly_nth_weekday", "week": 1-4, "weekday": 0-6}
- monthly_date : {"type": "monthly_date", "day": 1-31}
Returns unsaved ORM objects — the caller must add them to the session.
"""
if not parent.recurrence_rule:
return []
try:
rule: dict = json.loads(parent.recurrence_rule)
except (json.JSONDecodeError, TypeError):
return []
rule_type: str = rule.get("type", "")
parent_start: datetime = parent.start_datetime
parent_end: datetime = parent.end_datetime
duration: timedelta = parent_end - parent_start
horizon: datetime = parent_start + timedelta(days=horizon_days)
# Fields to copy verbatim from parent to each child
shared_fields = dict(
title=parent.title,
description=parent.description,
all_day=parent.all_day,
color=parent.color,
location_id=parent.location_id,
is_starred=parent.is_starred,
calendar_id=parent.calendar_id,
# Children do not carry the recurrence_rule; parent is the template
recurrence_rule=None,
parent_event_id=parent.id,
is_recurring=True,
)
occurrences: list[CalendarEvent] = []
def _make_child(occ_start: datetime) -> CalendarEvent:
occ_end = occ_start + duration
return CalendarEvent(
**shared_fields,
start_datetime=occ_start,
end_datetime=occ_end,
original_start=occ_start,
)
# Always generate a child for the parent's own start date (the first occurrence).
# The parent template is hidden from list views, so without this the first date
# would have no visible event.
occurrences.append(_make_child(parent_start))
if rule_type == "every_n_days":
interval: int = _rule_int(rule, "interval", 1)
if interval < 1:
interval = 1
current = parent_start + timedelta(days=interval)
while current < horizon:
occurrences.append(_make_child(current))
current += timedelta(days=interval)
elif rule_type == "weekly":
weekday: int = _rule_int(rule, "weekday", parent_start.weekday())
# Start from the next week after the parent
days_ahead = (weekday - parent_start.weekday()) % 7
if days_ahead == 0:
days_ahead = 7
current = parent_start + timedelta(days=days_ahead)
while current < horizon:
occurrences.append(_make_child(current))
current += timedelta(weeks=1)
elif rule_type == "monthly_nth_weekday":
week: int = _rule_int(rule, "week", 1)
weekday = _rule_int(rule, "weekday", parent_start.weekday())
# Advance month by month
year, month = parent_start.year, parent_start.month
# Move to the next month from the parent
month += 1
if month > 12:
month = 1
year += 1
while True:
target = _nth_weekday_of_month(year, month, weekday, week)
if target is None:
# Skip months where Nth weekday doesn't exist
pass
else:
# Preserve the time-of-day from the parent
occ_start = target.replace(
hour=parent_start.hour,
minute=parent_start.minute,
second=parent_start.second,
microsecond=0,
)
if occ_start >= horizon:
break
occurrences.append(_make_child(occ_start))
month += 1
if month > 12:
month = 1
year += 1
# Safety: if we've passed the horizon year by 2, stop
if year > horizon.year + 1:
break
elif rule_type == "monthly_date":
day: int = _rule_int(rule, "day", parent_start.day)
year, month = parent_start.year, parent_start.month
month += 1
if month > 12:
month = 1
year += 1
while True:
# Some months don't have day 29-31
try:
occ_start = datetime(
year, month, day,
parent_start.hour,
parent_start.minute,
parent_start.second,
)
except ValueError:
# Day out of range for this month — skip
month += 1
if month > 12:
month = 1
year += 1
if year > horizon.year + 1:
break
continue
if occ_start >= horizon:
break
occurrences.append(_make_child(occ_start))
month += 1
if month > 12:
month = 1
year += 1
if year > horizon.year + 1:
break
return occurrences