Quick scripts that worked once become production tooling that runs every day. The difference between a script you regret and one you trust is a handful of disciplines you apply from the first version.
Idempotency
An idempotent operation produces the same result whether you run it once or ten times. This is the single most important property of infrastructure automation. The pattern: read state, compare, only act if different.
def ensure_bucket_exists(s3, name: str, region: str) -> None:
"""Create the bucket only if it doesn't already exist."""
try:
s3.head_bucket(Bucket=name)
print(f"bucket {name} already exists")
return
except s3.exceptions.ClientError as e:
code = e.response["Error"]["Code"]
if code != "404":
raise
print(f"creating bucket {name}")
if region == "us-east-1":
s3.create_bucket(Bucket=name)
else:
s3.create_bucket(
Bucket=name,
CreateBucketConfiguration={"LocationConstraint": region},
)
Apply the same pattern for IAM roles, security groups, DNS records, Kubernetes resources — anywhere you might run the script again.
Dry-Run Mode
Every destructive script should accept --dry-run. In dry-run, you log what would happen but skip the actual change. Catch mistakes before they hit production:
def delete_old_snapshots(ec2, max_age_days: int, dry_run: bool) -> int:
cutoff = datetime.now(timezone.utc) - timedelta(days=max_age_days)
deleted = 0
paginator = ec2.get_paginator("describe_snapshots")
for page in paginator.paginate(OwnerIds=["self"]):
for snap in page["Snapshots"]:
if snap["StartTime"] < cutoff:
if dry_run:
print(f"DRY-RUN would delete {snap['SnapshotId']}")
else:
ec2.delete_snapshot(SnapshotId=snap["SnapshotId"])
print(f"deleted {snap['SnapshotId']}")
deleted += 1
return deleted
Tagging
Tag every cloud resource you create. At minimum: owner, environment, project, ticket / change ID. Tags drive cost reporting, cleanup automation, and incident response — "who created this thing".
tags = [
{"Key": "Owner", "Value": "platform-team"},
{"Key": "Environment", "Value": "production"},
{"Key": "Project", "Value": "billing-pipeline"},
{"Key": "ManagedBy", "Value": "automation-script-v1"},
]
ec2.create_tags(Resources=[instance_id], Tags=tags)
Retries with Exponential Backoff
Cloud APIs throttle. Use tenacity — a clean retry decorator that handles attempts, backoff, jitter, and conditional retry:
pip install tenacity
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from botocore.exceptions import ClientError
def is_throttling(exception):
if isinstance(exception, ClientError):
return exception.response["Error"]["Code"] in ("Throttling", "RequestLimitExceeded")
return False
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=1, max=30),
retry=retry_if_exception_type(ClientError),
reraise=True,
)
def describe_with_retry(ec2):
return ec2.describe_instances()
Running Subprocesses
Plenty of automation shells out to kubectl, terraform, aws CLI. Use subprocess.run with a list of arguments — never shell=True with string concatenation:
import subprocess
# WRONG — vulnerable to command injection
# subprocess.run(f"kubectl get pods -n {namespace}", shell=True)
# RIGHT
result = subprocess.run(
["kubectl", "get", "pods", "-n", namespace, "-o", "json"],
check=True, # raises if exit code != 0
capture_output=True,
text=True,
timeout=30,
)
pods = json.loads(result.stdout)
Set check=True to fail loudly. Set timeout on anything that talks to the network.
Structured Logging
Use the logging module, not print, in anything that runs unattended. Log JSON if shipping to a SIEM:
import logging
import json
import sys
class JsonFormatter(logging.Formatter):
def format(self, record):
return json.dumps({
"ts": self.formatTime(record),
"level": record.levelname,
"msg": record.getMessage(),
"logger": record.name,
})
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(JsonFormatter())
logging.basicConfig(level=logging.INFO, handlers=[handler])
log = logging.getLogger("automation")
log.info("starting cleanup", extra={"max_age_days": 30})
Exit Codes
Use exit codes deliberately so CI/CD and orchestrators can tell success from failure:
| 0 | Success |
| 1 | Generic failure |
| 2 | Misuse / bad arguments |
| 3-N | Domain-specific (document them) |
def main() -> int:
try:
do_work()
return 0
except ConfigurationError as e:
log.error("bad config: %s", e)
return 2
except Exception:
log.exception("unexpected failure")
return 1
if __name__ == "__main__":
raise SystemExit(main())
Configuration
Don't bury values in code. Order of preference:
- CLI flags (per-run overrides)
- Environment variables (per-environment defaults)
- Config file (toml/yaml, in source control)
- Hardcoded defaults (last resort)
pydantic-settings is excellent for this — typed config built from layered sources.
A Production-Ready Skeleton
"""Cleanup old EBS snapshots."""
import argparse
import logging
import sys
from datetime import datetime, timedelta, timezone
import boto3
log = logging.getLogger("snap-cleanup")
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--max-age-days", type=int, default=30)
parser.add_argument("--region", default="us-east-1")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--verbose", "-v", action="store_true")
args = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
ec2 = boto3.client("ec2", region_name=args.region)
cutoff = datetime.now(timezone.utc) - timedelta(days=args.max_age_days)
deleted = 0
try:
paginator = ec2.get_paginator("describe_snapshots")
for page in paginator.paginate(OwnerIds=["self"]):
for snap in page["Snapshots"]:
if snap["StartTime"] < cutoff:
if args.dry_run:
log.info("DRY-RUN delete %s (%s)", snap["SnapshotId"], snap["StartTime"])
else:
ec2.delete_snapshot(SnapshotId=snap["SnapshotId"])
log.info("deleted %s", snap["SnapshotId"])
deleted += 1
log.info("done; %d snapshots %s", deleted, "would be deleted" if args.dry_run else "deleted")
return 0
except Exception:
log.exception("cleanup failed")
return 1
if __name__ == "__main__":
sys.exit(main())
That's the shape of automation you can run nightly without losing sleep over.