Files
Subscribarr/arr_api/services.py

193 lines
6.9 KiB
Python

# arr_api/services.py
import os
import requests
from datetime import datetime, timedelta, timezone
from dateutil.parser import isoparse
# ENV-Fallbacks
ENV_SONARR_URL = os.getenv("SONARR_URL", "")
ENV_SONARR_KEY = os.getenv("SONARR_API_KEY", "")
ENV_RADARR_URL = os.getenv("RADARR_URL", "")
ENV_RADARR_KEY = os.getenv("RADARR_API_KEY", "")
DEFAULT_DAYS = int(os.getenv("ARR_DEFAULT_DAYS", "30"))
class ArrServiceError(Exception):
pass
def _get(url, headers, params=None, timeout=5):
try:
r = requests.get(url, headers=headers, params=params or {}, timeout=timeout)
r.raise_for_status()
return r.json()
except requests.exceptions.RequestException as e:
raise ArrServiceError(str(e))
def _abs_url(base: str, p: str | None) -> str | None:
if not p:
return None
return f"{base.rstrip('/')}" + p if p.startswith("/") else p
def sonarr_calendar(days: int | None = None, base_url: str | None = None, api_key: str | None = None):
base = (base_url or ENV_SONARR_URL).strip()
key = (api_key or ENV_SONARR_KEY).strip()
if not base or not key:
return []
d = days or DEFAULT_DAYS
start = datetime.now(timezone.utc)
end = start + timedelta(days=d)
url = f"{base.rstrip('/')}/api/v3/calendar"
headers = {"X-Api-Key": key}
data = _get(url, headers, params={
"start": start.date().isoformat(),
"end": end.date().isoformat(),
"unmonitored": "false",
"includeSeries": "true",
})
out = []
for ep in data:
series = ep.get("series") or {}
# Poster finden
poster = None
for img in (series.get("images") or []):
if (img.get("coverType") or "").lower() == "poster":
poster = img.get("remoteUrl") or _abs_url(base, img.get("url"))
if poster:
break
aired = isoparse(ep["airDateUtc"]).isoformat() if ep.get("airDateUtc") else None
out.append({
"seriesId": series.get("id"),
"seriesTitle": series.get("title"),
"seriesStatus": (series.get("status") or "").lower(),
"seriesPoster": poster,
"seriesOverview": series.get("overview") or "",
"seriesGenres": series.get("genres") or [],
"episodeId": ep.get("id"),
"seasonNumber": ep.get("seasonNumber"),
"episodeNumber": ep.get("episodeNumber"),
"title": ep.get("title"),
"airDateUtc": aired,
"tvdbId": series.get("tvdbId"),
"imdbId": series.get("imdbId"),
"network": series.get("network"),
})
return [x for x in out if x["seriesStatus"] == "continuing"]
def radarr_calendar(days: int | None = None, base_url: str | None = None, api_key: str | None = None):
base = (base_url or ENV_RADARR_URL).strip()
key = (api_key or ENV_RADARR_KEY).strip()
if not base or not key:
return []
d = days or DEFAULT_DAYS
start = datetime.now(timezone.utc)
end = start + timedelta(days=d)
url = f"{base.rstrip('/')}/api/v3/calendar"
headers = {"X-Api-Key": key}
data = _get(url, headers, params={
"start": start.date().isoformat(),
"end": end.date().isoformat(),
"unmonitored": "false",
"includeMovie": "true",
})
out = []
for it in data:
movie = it.get("movie") or it
# Poster finden
poster = None
for img in (movie.get("images") or []):
if (img.get("coverType") or "").lower() == "poster":
poster = img.get("remoteUrl") or _abs_url(base, img.get("url"))
if poster:
break
out.append({
"movieId": movie.get("id"),
"title": movie.get("title"),
"year": movie.get("year"),
"tmdbId": movie.get("tmdbId"),
"imdbId": movie.get("imdbId"),
"posterUrl": poster,
"overview": movie.get("overview") or "",
"inCinemas": movie.get("inCinemas"),
"physicalRelease": movie.get("physicalRelease"),
"digitalRelease": movie.get("digitalRelease"),
"hasFile": movie.get("hasFile"),
"isAvailable": movie.get("isAvailable"),
})
def is_upcoming(m):
for k in ("inCinemas", "physicalRelease", "digitalRelease"):
v = m.get(k)
if v:
try:
if isoparse(v) > datetime.now(timezone.utc):
return True
except Exception:
pass
return False
return [m for m in out if is_upcoming(m)]
def sonarr_get_series(series_id: int, base_url: str | None = None, api_key: str | None = None) -> dict | None:
"""Fetch a single series by id from Sonarr, return dict with title, overview, poster and genres."""
base = (base_url or ENV_SONARR_URL).strip()
key = (api_key or ENV_SONARR_KEY).strip()
if not base or not key:
return None
url = f"{base.rstrip('/')}/api/v3/series/{series_id}"
headers = {"X-Api-Key": key}
data = _get(url, headers)
# Poster
poster = None
for img in (data.get("images") or []):
if (img.get("coverType") or "").lower() == "poster":
poster = img.get("remoteUrl") or _abs_url(base, img.get("url"))
if poster:
break
return {
"series_id": data.get("id"),
"series_title": data.get("title"),
"series_overview": data.get("overview") or "",
"series_genres": data.get("genres") or [],
"series_poster": poster,
}
def radarr_lookup_movie_by_title(title: str, base_url: str | None = None, api_key: str | None = None) -> dict | None:
"""Lookup a movie by title via Radarr /api/v3/movie/lookup. Returns title, poster, overview, genres, year, tmdbId, and id if present."""
base = (base_url or ENV_RADARR_URL).strip()
key = (api_key or ENV_RADARR_KEY).strip()
if not base or not key or not title:
return None
url = f"{base.rstrip('/')}/api/v3/movie/lookup"
headers = {"X-Api-Key": key}
data = _get(url, headers, params={"term": title})
if not data:
return None
# naive pick: exact match by title (case-insensitive), else first
best = None
for it in data:
if (it.get("title") or "").lower() == title.lower():
best = it
break
if not best:
best = data[0]
poster = None
for img in (best.get("images") or []):
if (img.get("coverType") or "").lower() == "poster":
poster = img.get("remoteUrl") or _abs_url(base, img.get("url"))
if poster:
break
return {
"movie_id": best.get("id") or 0,
"title": best.get("title") or title,
"poster": poster,
"overview": best.get("overview") or "",
"genres": best.get("genres") or [],
"year": best.get("year"),
"tmdbId": best.get("tmdbId"),
}