Skip to content
6 min read·Lesson 6 of 10

Writing Robust Infrastructure Automation

Build automation scripts that are idempotent, retried, observable, and safe to run in production.

Quick scripts that worked once become production tooling that runs every day. The difference between a script you regret and one you trust is a handful of disciplines you apply from the first version.

Idempotency

An idempotent operation produces the same result whether you run it once or ten times. This is the single most important property of infrastructure automation. The pattern: read state, compare, only act if different.

def ensure_bucket_exists(s3, name: str, region: str) -> None:
    """Create the bucket only if it doesn't already exist."""
    try:
        s3.head_bucket(Bucket=name)
        print(f"bucket {name} already exists")
        return
    except s3.exceptions.ClientError as e:
        code = e.response["Error"]["Code"]
        if code != "404":
            raise

    print(f"creating bucket {name}")
    if region == "us-east-1":
        s3.create_bucket(Bucket=name)
    else:
        s3.create_bucket(
            Bucket=name,
            CreateBucketConfiguration={"LocationConstraint": region},
        )

Apply the same pattern for IAM roles, security groups, DNS records, Kubernetes resources — anywhere you might run the script again.

Dry-Run Mode

Every destructive script should accept --dry-run. In dry-run, you log what would happen but skip the actual change. Catch mistakes before they hit production:

def delete_old_snapshots(ec2, max_age_days: int, dry_run: bool) -> int:
    cutoff = datetime.now(timezone.utc) - timedelta(days=max_age_days)
    deleted = 0
    paginator = ec2.get_paginator("describe_snapshots")
    for page in paginator.paginate(OwnerIds=["self"]):
        for snap in page["Snapshots"]:
            if snap["StartTime"] < cutoff:
                if dry_run:
                    print(f"DRY-RUN would delete {snap['SnapshotId']}")
                else:
                    ec2.delete_snapshot(SnapshotId=snap["SnapshotId"])
                    print(f"deleted {snap['SnapshotId']}")
                deleted += 1
    return deleted

Tagging

Tag every cloud resource you create. At minimum: owner, environment, project, ticket / change ID. Tags drive cost reporting, cleanup automation, and incident response — "who created this thing".

tags = [
    {"Key": "Owner", "Value": "platform-team"},
    {"Key": "Environment", "Value": "production"},
    {"Key": "Project", "Value": "billing-pipeline"},
    {"Key": "ManagedBy", "Value": "automation-script-v1"},
]
ec2.create_tags(Resources=[instance_id], Tags=tags)

Retries with Exponential Backoff

Cloud APIs throttle. Use tenacity — a clean retry decorator that handles attempts, backoff, jitter, and conditional retry:

pip install tenacity
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from botocore.exceptions import ClientError

def is_throttling(exception):
    if isinstance(exception, ClientError):
        return exception.response["Error"]["Code"] in ("Throttling", "RequestLimitExceeded")
    return False


@retry(
    stop=stop_after_attempt(5),
    wait=wait_exponential(multiplier=1, min=1, max=30),
    retry=retry_if_exception_type(ClientError),
    reraise=True,
)
def describe_with_retry(ec2):
    return ec2.describe_instances()

Running Subprocesses

Plenty of automation shells out to kubectl, terraform, aws CLI. Use subprocess.run with a list of arguments — never shell=True with string concatenation:

import subprocess

# WRONG — vulnerable to command injection
# subprocess.run(f"kubectl get pods -n {namespace}", shell=True)

# RIGHT
result = subprocess.run(
    ["kubectl", "get", "pods", "-n", namespace, "-o", "json"],
    check=True,                    # raises if exit code != 0
    capture_output=True,
    text=True,
    timeout=30,
)
pods = json.loads(result.stdout)

Set check=True to fail loudly. Set timeout on anything that talks to the network.

Structured Logging

Use the logging module, not print, in anything that runs unattended. Log JSON if shipping to a SIEM:

import logging
import json
import sys

class JsonFormatter(logging.Formatter):
    def format(self, record):
        return json.dumps({
            "ts": self.formatTime(record),
            "level": record.levelname,
            "msg": record.getMessage(),
            "logger": record.name,
        })

handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(JsonFormatter())
logging.basicConfig(level=logging.INFO, handlers=[handler])
log = logging.getLogger("automation")

log.info("starting cleanup", extra={"max_age_days": 30})

Exit Codes

Use exit codes deliberately so CI/CD and orchestrators can tell success from failure:

0Success
1Generic failure
2Misuse / bad arguments
3-NDomain-specific (document them)
def main() -> int:
    try:
        do_work()
        return 0
    except ConfigurationError as e:
        log.error("bad config: %s", e)
        return 2
    except Exception:
        log.exception("unexpected failure")
        return 1

if __name__ == "__main__":
    raise SystemExit(main())

Configuration

Don't bury values in code. Order of preference:

  1. CLI flags (per-run overrides)
  2. Environment variables (per-environment defaults)
  3. Config file (toml/yaml, in source control)
  4. Hardcoded defaults (last resort)

pydantic-settings is excellent for this — typed config built from layered sources.

A Production-Ready Skeleton

"""Cleanup old EBS snapshots."""
import argparse
import logging
import sys
from datetime import datetime, timedelta, timezone

import boto3

log = logging.getLogger("snap-cleanup")


def main() -> int:
    parser = argparse.ArgumentParser()
    parser.add_argument("--max-age-days", type=int, default=30)
    parser.add_argument("--region", default="us-east-1")
    parser.add_argument("--dry-run", action="store_true")
    parser.add_argument("--verbose", "-v", action="store_true")
    args = parser.parse_args()

    logging.basicConfig(
        level=logging.DEBUG if args.verbose else logging.INFO,
        format="%(asctime)s %(levelname)s %(message)s",
    )

    ec2 = boto3.client("ec2", region_name=args.region)
    cutoff = datetime.now(timezone.utc) - timedelta(days=args.max_age_days)

    deleted = 0
    try:
        paginator = ec2.get_paginator("describe_snapshots")
        for page in paginator.paginate(OwnerIds=["self"]):
            for snap in page["Snapshots"]:
                if snap["StartTime"] < cutoff:
                    if args.dry_run:
                        log.info("DRY-RUN delete %s (%s)", snap["SnapshotId"], snap["StartTime"])
                    else:
                        ec2.delete_snapshot(SnapshotId=snap["SnapshotId"])
                        log.info("deleted %s", snap["SnapshotId"])
                    deleted += 1
        log.info("done; %d snapshots %s", deleted, "would be deleted" if args.dry_run else "deleted")
        return 0
    except Exception:
        log.exception("cleanup failed")
        return 1


if __name__ == "__main__":
    sys.exit(main())

That's the shape of automation you can run nightly without losing sleep over.

Key Takeaways

  • Idempotency: running a script twice should leave the system in the same state.
  • Tag everything you create — owner, environment, ticket — for cost and cleanup.
  • Always implement --dry-run; default to read-only when in doubt.
  • Retry transient errors; fail fast on permanent ones.
  • Use subprocess.run with a list of args, never shell=True with concatenation.

Test your knowledge

Try exam-style practice questions to reinforce what you've learned.

Practice Questions →