Code: Select all
#!/Users/venv311/bin/python3.11
import os, sys, io, random, time
from dotenv import load_dotenv
from pathlib import Path
from typing import Optional
from PIL import Image
import requests
from requests_oauthlib import OAuth1
from datetime import datetime
from transformers.utils.logging import set_verbosity_error
set_verbosity_error()
# If you have HEIC files:
try:
import pillow_heif
pillow_heif.register_heif_opener()
except Exception:
pass # ok if not installed
# --- Load .env first
load_dotenv(dotenv_path="/Users/$USER/.env", override=True)
# --- Optional local image caption (BLIP) ---
def describe_image(path: Path) -> Optional[str]:
if os.getenv("CAPTION_MODE", "none").lower() != "blip":
return None
try:
from transformers import BlipProcessor, BlipForConditionalGeneration # lazy import
proc = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base")
im = Image.open(path).convert("RGB")
inputs = proc(images=im, return_tensors="pt")
out = model.generate(**inputs, max_new_tokens=30)
txt = proc.decode(out[0], skip_special_tokens=True).strip()
return txt[:180] or None
except Exception as e:
print("Caption skipped:", e)
return None
# --- Rate-limit aware POST ---
def post_with_rate_limits(url, auth, json=None, files=None, max_retries=0, max_sleep=120):
for attempt in range(max_retries + 1):
r = requests.post(url, auth=auth, json=json, files=files, timeout=60)
if r.status_code != 429:
return r
reset_hdr = r.headers.get("x-rate-limit-reset")
if reset_hdr and reset_hdr.isdigit():
wait = max(0, int(reset_hdr) - int(time.time())) + 3
else:
wait = (2 ** attempt) + random.uniform(0, 1.0)
if wait > max_sleep:
print(f"[RATE LIMIT] 429. Need ~{wait:.0f}s, over cap ({max_sleep}s). Aborting.")
return r # caller can decide to exit(1)
print(f"[RATE LIMIT] 429. Sleeping {wait:.1f}s…")
time.sleep(wait)
return r
def env(name: str, default: Optional[str] = None) -> Optional[str]:
v = os.getenv(name)
return v if v else default
def first(*names):
for n in names:
v = os.getenv(n)
if v:
return v
return None
CK = first("X_CONSUMER_KEY","TW_CONSUMER_KEY","CONSUMER_KEY")
CS = first("X_CONSUMER_SECRET","TW_CONSUMER_SECRET","CONSUMER_SECRET")
AT = first("X_ACCESS_TOKEN","TW_ACCESS_TOKEN","ACCESS_TOKEN")
ATS = first("X_ACCESS_TOKEN_SECRET","TW_ACCESS_TOKEN_SECRET","ACCESS_TOKEN_SECRET")
PHOTO_DIR = Path(env("PHOTO_DIR", "/Users/Pictures/Peru"))
DEFAULT_CAPTION = env("DEFAULT_CAPTION", "Peru 🇵🇪")
missing = [k for k,v in {
"CONSUMER_KEY": CK, "CONSUMER_SECRET": CS,
"ACCESS_TOKEN": AT, "ACCESS_TOKEN_SECRET": ATS
}.items() if not v]
if missing:
sys.exit(f"Missing creds: {', '.join(missing)}")
if not PHOTO_DIR.exists():
sys.exit(f"PHOTO_DIR not found: {PHOTO_DIR}")
print(f"[INFO] Using folder: {PHOTO_DIR}")
# Collect images (case-insensitive, incl. HEIC if pillow-heif is present)
exts = {".jpg",".jpeg",".png",".heic",".heif"}
imgs = [p for p in PHOTO_DIR.rglob("*") if p.suffix.lower() in exts]
print(f"[INFO] Found {len(imgs)} images")
if not imgs:
sys.exit("No images found. Check PHOTO_DIR and extensions.")
img_path = random.choice(imgs)
print(f"[INFO] Selected: {img_path}")
# Prepare image under ~4.9MB for upload
def prepare_image(path: Path, target_bytes=4_900_000):
im = Image.open(path).convert("RGB")
for q in (92, 88, 84, 80, 76, 72, 68, 64, 60):
buf = io.BytesIO()
im.save(buf, format="JPEG", quality=q, optimize=True)
if buf.tell() <= target_bytes:
buf.seek(0)
return buf, "image/jpeg"
w, h = im.size
im = im.resize((max(1,int(w*0.85)), max(1,int(h*0.85))))
buf = io.BytesIO()
im.save(buf, format="JPEG", quality=72, optimize=True)
buf.seek(0)
return buf, "image/jpeg"
media_buf, mime = prepare_image(img_path)
auth = OAuth1(CK, CS, AT, ATS)
# --- 1) v1.1 media upload (with backoff) ---
upload_url = "https://upload.twitter.com/1.1/media/upload.json"
up = post_with_rate_limits(upload_url, auth, files={"media": ("photo.jpg", media_buf, mime)})
print("[UPLOAD]", up.status_code, up.text[:300])
up.raise_for_status()
media_id = up.json()["media_id_string"]
# Optional alt text
try:
meta_url = "https://upload.twitter.com/1.1/media/metadata/create.json"
requests.post(meta_url, auth=auth, json={"media_id": media_id, "alt_text": {"text": img_path.name}}, timeout=30)
except Exception as e:
print("[ALT-TEXT] skipped:", e)
# --- 2) Build final caption ---
# Your requirement: "Hello from Peru. Ancient alien tour with Brien Forester." followed by AI description if available.
# --- Build the main caption first ---
ai_caption = describe_image(img_path)
if ai_caption:
ai_caption = ai_caption.strip()
ai_caption = ai_caption[0].upper() + ai_caption[1:]
if not ai_caption.endswith(('.', '!', '?')):
ai_caption += '.'
status_text = f"Hello from Peru. Ancient alien tour with Brien Forester.` {ai_caption}"
else:
status_text = "Hello from Peru."
# --- THEN add a small unique timestamp suffix (to avoid duplicates) ---
status_text = status_text[:270] + " · " + datetime.now().strftime("%H:%M")
# --- Keep within 280 characters ---
if len(status_text) > 280:
status_text = status_text[:277] + "…"
# MIN_POST_INTERVAL = 2 * 60 * 60 # 2 hours
# STAMP = Path("/Users/danielhudsky/lab/.last_x_post")
# now = int(time.time())
# if STAMP.exists():
# last = int(STAMP.read_text().strip() or "0")
# remain = MIN_POST_INTERVAL - (now - last)
# if remain > 0:
# print(f"[PACING] Last post too recent. Try again in {remain}s.")
# sys.exit(0)
# # ... after a successful tweet:
# STAMP.write_text(str(now))
# Keep within 280 characters
if len(status_text) > 280:
status_text = status_text[:277] + "…"
# --- 3) v2 create tweet (with backoff) ---
tweet_url = "https://api.twitter.com/2/tweets"
payload = {"text": status_text, "media": {"media_ids": [media_id]}}
tw = post_with_rate_limits(
tweet_url,
auth,
json=payload,
max_retries=1,
max_sleep=300, # allow up to 5 min wait if rate-limited
)
print("[TWEET]", tw.status_code, tw.text[:300])
if tw.status_code == 429:
print("[TWEET] Still rate-limited. Exiting cleanly.")
sys.exit(0)
tw.raise_for_status()