""" Recurrence service — generates materialized child CalendarEvent ORM objects from a parent event's recurrence_rule JSON string. All datetimes are naive (no timezone) to match TIMESTAMP WITHOUT TIME ZONE in DB. """ import json from datetime import datetime, timedelta from typing import Optional from app.models.calendar_event import CalendarEvent # Hard cap: never generate more than 730 child events regardless of horizon_days. MAX_OCCURRENCES = 730 def _nth_weekday_of_month(year: int, month: int, weekday: int, week: int) -> Optional[datetime]: """ Return the date of the Nth (1-4) occurrence of a given weekday (0=Mon…6=Sun) in a month, or None if it doesn't exist (e.g. 5th Monday in a short month). """ # Find the first occurrence of the weekday in the month first_day = datetime(year, month, 1) # days_ahead: how many days until the target weekday days_ahead = (weekday - first_day.weekday()) % 7 first_occurrence = first_day + timedelta(days=days_ahead) # Advance by (week - 1) full weeks target = first_occurrence + timedelta(weeks=week - 1) if target.month != month: return None return target def _rule_int(rule: dict, key: str, default: int) -> int: """Get an int from the rule dict, falling back to default if missing or None.""" val = rule.get(key) return int(val) if val is not None else default def generate_occurrences( parent: CalendarEvent, horizon_days: int = 365, ) -> list[CalendarEvent]: """ Parse parent.recurrence_rule (JSON string) and materialise child CalendarEvent objects up to horizon_days from parent.start_datetime. Supported rule types: - every_n_days : {"type": "every_n_days", "interval": N} - weekly : {"type": "weekly", "weekday": 0-6} (0=Mon, 6=Sun) - monthly_nth_weekday: {"type": "monthly_nth_weekday", "week": 1-4, "weekday": 0-6} - monthly_date : {"type": "monthly_date", "day": 1-31} Returns unsaved ORM objects — the caller must add them to the session. """ if not parent.recurrence_rule: return [] try: rule: dict = json.loads(parent.recurrence_rule) except (json.JSONDecodeError, TypeError): return [] rule_type: str = rule.get("type", "") parent_start: datetime = parent.start_datetime parent_end: datetime = parent.end_datetime duration: timedelta = parent_end - parent_start horizon: datetime = parent_start + timedelta(days=horizon_days) # Fields to copy verbatim from parent to each child shared_fields = dict( title=parent.title, description=parent.description, all_day=parent.all_day, color=parent.color, location_id=parent.location_id, is_starred=parent.is_starred, calendar_id=parent.calendar_id, # Children do not carry the recurrence_rule; parent is the template recurrence_rule=None, parent_event_id=parent.id, is_recurring=True, ) occurrences: list[CalendarEvent] = [] def _make_child(occ_start: datetime) -> CalendarEvent: occ_end = occ_start + duration return CalendarEvent( **shared_fields, start_datetime=occ_start, end_datetime=occ_end, original_start=occ_start, ) # Always generate a child for the parent's own start date (the first occurrence). # The parent template is hidden from list views, so without this the first date # would have no visible event. occurrences.append(_make_child(parent_start)) if rule_type == "every_n_days": interval: int = _rule_int(rule, "interval", 1) if interval < 1: interval = 1 # Adaptive horizon: cap daily-ish events (interval < 7) to 90 days effective_horizon = horizon if interval >= 7 else min(horizon, parent_start + timedelta(days=90)) current = parent_start + timedelta(days=interval) while current < effective_horizon: if len(occurrences) >= MAX_OCCURRENCES: break occurrences.append(_make_child(current)) current += timedelta(days=interval) elif rule_type == "weekly": weekday: int = _rule_int(rule, "weekday", parent_start.weekday()) # Start from the next week after the parent days_ahead = (weekday - parent_start.weekday()) % 7 if days_ahead == 0: days_ahead = 7 current = parent_start + timedelta(days=days_ahead) while current < horizon: if len(occurrences) >= MAX_OCCURRENCES: break occurrences.append(_make_child(current)) current += timedelta(weeks=1) elif rule_type == "monthly_nth_weekday": week: int = _rule_int(rule, "week", 1) weekday = _rule_int(rule, "weekday", parent_start.weekday()) # Advance month by month year, month = parent_start.year, parent_start.month # Move to the next month from the parent month += 1 if month > 12: month = 1 year += 1 while True: if len(occurrences) >= MAX_OCCURRENCES: break target = _nth_weekday_of_month(year, month, weekday, week) if target is None: # Skip months where Nth weekday doesn't exist pass else: # Preserve the time-of-day from the parent occ_start = target.replace( hour=parent_start.hour, minute=parent_start.minute, second=parent_start.second, microsecond=0, ) if occ_start >= horizon: break occurrences.append(_make_child(occ_start)) month += 1 if month > 12: month = 1 year += 1 # Safety: if we've passed the horizon year by 2, stop if year > horizon.year + 1: break elif rule_type == "monthly_date": day: int = _rule_int(rule, "day", parent_start.day) year, month = parent_start.year, parent_start.month month += 1 if month > 12: month = 1 year += 1 while True: if len(occurrences) >= MAX_OCCURRENCES: break # Some months don't have day 29-31 try: occ_start = datetime( year, month, day, parent_start.hour, parent_start.minute, parent_start.second, ) except ValueError: # Day out of range for this month — skip month += 1 if month > 12: month = 1 year += 1 if year > horizon.year + 1: break continue if occ_start >= horizon: break occurrences.append(_make_child(occ_start)) month += 1 if month > 12: month = 1 year += 1 if year > horizon.year + 1: break return occurrences