""" Recurrence service — generates materialized child CalendarEvent ORM objects from a parent event's recurrence_rule JSON string. All datetimes are naive (no timezone) to match TIMESTAMP WITHOUT TIME ZONE in DB. """ import json from datetime import datetime, timedelta from typing import Optional from app.models.calendar_event import CalendarEvent def _nth_weekday_of_month(year: int, month: int, weekday: int, week: int) -> Optional[datetime]: """ Return the date of the Nth (1-4) occurrence of a given weekday (0=Mon…6=Sun) in a month, or None if it doesn't exist (e.g. 5th Monday in a short month). """ # Find the first occurrence of the weekday in the month first_day = datetime(year, month, 1) # days_ahead: how many days until the target weekday days_ahead = (weekday - first_day.weekday()) % 7 first_occurrence = first_day + timedelta(days=days_ahead) # Advance by (week - 1) full weeks target = first_occurrence + timedelta(weeks=week - 1) if target.month != month: return None return target def generate_occurrences( parent: CalendarEvent, horizon_days: int = 365, ) -> list[CalendarEvent]: """ Parse parent.recurrence_rule (JSON string) and materialise child CalendarEvent objects up to horizon_days from parent.start_datetime. Supported rule types: - every_n_days : {"type": "every_n_days", "interval": N} - weekly : {"type": "weekly", "weekday": 0-6} (0=Mon, 6=Sun) - monthly_nth_weekday: {"type": "monthly_nth_weekday", "week": 1-4, "weekday": 0-6} - monthly_date : {"type": "monthly_date", "day": 1-31} Returns unsaved ORM objects — the caller must add them to the session. """ if not parent.recurrence_rule: return [] try: rule: dict = json.loads(parent.recurrence_rule) except (json.JSONDecodeError, TypeError): return [] rule_type: str = rule.get("type", "") parent_start: datetime = parent.start_datetime parent_end: datetime = parent.end_datetime duration: timedelta = parent_end - parent_start horizon: datetime = parent_start + timedelta(days=horizon_days) # Fields to copy verbatim from parent to each child shared_fields = dict( title=parent.title, description=parent.description, all_day=parent.all_day, color=parent.color, location_id=parent.location_id, is_starred=parent.is_starred, calendar_id=parent.calendar_id, # Children do not carry the recurrence_rule; parent is the template recurrence_rule=None, parent_event_id=parent.id, is_recurring=True, ) occurrences: list[CalendarEvent] = [] def _make_child(occ_start: datetime) -> CalendarEvent: occ_end = occ_start + duration return CalendarEvent( **shared_fields, start_datetime=occ_start, end_datetime=occ_end, original_start=occ_start, ) if rule_type == "every_n_days": interval: int = int(rule.get("interval") or 1) if interval < 1: interval = 1 current = parent_start + timedelta(days=interval) while current < horizon: occurrences.append(_make_child(current)) current += timedelta(days=interval) elif rule_type == "weekly": weekday: int = int(rule.get("weekday") if rule.get("weekday") is not None else parent_start.weekday()) # Start from the week after the parent days_ahead = (weekday - parent_start.weekday()) % 7 if days_ahead == 0: days_ahead = 7 # skip the parent's own week current = parent_start + timedelta(days=days_ahead) while current < horizon: occurrences.append(_make_child(current)) current += timedelta(weeks=1) elif rule_type == "monthly_nth_weekday": week: int = int(rule.get("week") or 1) weekday = int(rule.get("weekday") if rule.get("weekday") is not None else parent_start.weekday()) # Advance month by month year, month = parent_start.year, parent_start.month # Move to the next month from the parent month += 1 if month > 12: month = 1 year += 1 while True: target = _nth_weekday_of_month(year, month, weekday, week) if target is None: # Skip months where Nth weekday doesn't exist pass else: # Preserve the time-of-day from the parent occ_start = target.replace( hour=parent_start.hour, minute=parent_start.minute, second=parent_start.second, microsecond=0, ) if occ_start >= horizon: break occurrences.append(_make_child(occ_start)) month += 1 if month > 12: month = 1 year += 1 # Safety: if we've passed the horizon year by 2, stop if year > horizon.year + 1: break elif rule_type == "monthly_date": day: int = int(rule.get("day") or parent_start.day) year, month = parent_start.year, parent_start.month month += 1 if month > 12: month = 1 year += 1 while True: # Some months don't have day 29-31 try: occ_start = datetime( year, month, day, parent_start.hour, parent_start.minute, parent_start.second, ) except ValueError: # Day out of range for this month — skip month += 1 if month > 12: month = 1 year += 1 if year > horizon.year + 1: break continue if occ_start >= horizon: break occurrences.append(_make_child(occ_start)) month += 1 if month > 12: month = 1 year += 1 if year > horizon.year + 1: break return occurrences