\appendix

Code examples and technical architecture as code implementations

Appendix A collects every code example, configuration file, and technical implementation referenced throughout the book. The examples are organised by theme so that readers can quickly locate the implementation that matches their current need.

This appendix acts as a practical reference library for all technical demonstrations in the book. Each code listing is categorised and labelled with backlinks to the relevant chapter so the narrative and executable artefacts stay in sync.

The code examples are grouped into the following categories:

  1. CI/CD pipelines and Architecture as Code automation
  2. Infrastructure as Code (Architecture as Code) – Terraform
  3. Infrastructure as Code (Architecture as Code) – CloudFormation
  4. Automation scripts and tooling
  5. Security and compliance
  6. Testing and validation
  7. Configuration files
  8. Shell scripts and utilities

Each example has a unique identifier in the format [chapter]_CODE_[NUMBER] for easy cross-reference with the main text.

CI/CD Pipelines and Architecture as Code Automation {#cicd-pipelines}

This section contains all CI/CD pipeline examples, GitHub Actions workflows, and automation processes for organisations.

05_CODE_1: GDPR-compliant CI/CD pipeline for organisations {#05_code_1}

Referenced from chapter 5: Automation, DevOps and CI/CD for Architecture as Code

# .github/workflows/architecture-as-code-pipeline.yml
# GDPR-compliant CI/CD pipeline for organisations

name: Architecture as Code Pipeline with GDPR Compliance

on:
  push:
    branches: [main, staging, development]
    paths:
      - 'infrastructure/**'
      - 'modules/**'
  pull_request:
    branches: [main, staging]
    paths:
      - 'infrastructure/**'
      - 'modules/**'

env:
  TF_VERSION: '1.6.0'
  ORGANISATION_NAME: ${{ vars.ORGANISATION_NAME }}
  ENVIRONMENT: ${{ github.ref_name == 'main' && 'production' || github.ref_name }}
  COST_CENTRE: ${{ vars.COST_CENTRE }}
  GDPR_COMPLIANCE_ENABLED: 'true'
  DATA_RESIDENCY: 'EU'
  AUDIT_LOGGING: 'enabled'

jobs:
  gdpr-compliance-check:
    name: GDPR Compliance Validation
    runs-on: ubuntu-latest
    if: contains(github.event.head_commit.message, 'personal-data') || contains(github.event.head_commit.message, 'gdpr')

    steps:
      - name: Check out code
        uses: actions/checkout@v4
        with:
          token: ${{ secrets.GITHUB_TOKEN }}
          fetch-depth: 0

      - name: GDPR data discovery scan
        run: |
          echo "🔍 Scanning for personal data indicators across EU jurisdictions..."

          PERSONAL_DATA_PATTERNS=(
            "national\\s+identity"
            "passport\\s+number"
            "social.*security"
            "tax\\s+identification"
            "vat\\s+number"
            "iban"
            "bic"
            "eori"
            "driver'?s\\s+licence"
            "health\\s+insurance\\s+number"
            "email.*address"
            "phone.*number"
            "date.*of.*birth"
          )

          VIOLATIONS_FOUND=false

          for pattern in "${PERSONAL_DATA_PATTERNS[@]}"; do
            if grep -R -i -E "$pattern" infrastructure/ modules/ 2>/dev/null; then
              echo "⚠️ GDPR WARNING: Potential personal data reference detected for pattern: $pattern"
              VIOLATIONS_FOUND=true
            fi
          done

          if [ "$VIOLATIONS_FOUND" = true ]; then
            echo "❌ GDPR compliance check failed"
            echo "Personal data must not be hard coded in Architecture as Code assets."
            exit 1
          fi

          echo "✅ GDPR compliance check completed successfully"

05_CODE_2: Jenkins pipeline for organisations with GDPR compliance {#05_code_2}

Referenced from chapter 5: Automation, DevOps and CI/CD for Architecture as Code

# jenkins/architecture-as-code-pipeline.groovy
// Jenkins pipeline for organisations with GDPR compliance

pipeline {
    agent any

    parameters {
        choice(
            name: 'ENVIRONMENT',
            choices: ['development', 'staging', 'production'],
            description: 'Target environment for deployment'
        )
        booleanParam(
            name: 'FORCE_DEPLOYMENT',
            defaultValue: false,
            description: 'Force deployment even when warnings are raised (development only)'
        )
        string(
            name: 'COST_CENTER',
            defaultValue: 'CC-IT-001',
            description: 'Cost centre used for financial reporting'
        )
    }

    environment {
        ORGANISATION_NAME = 'example-org'
        AWS_DEFAULT_REGION = 'eu-west-1'
        GDPR_COMPLIANCE = 'enabled'
        DATA_RESIDENCY = 'EU'
        TERRAFORM_VERSION = '1.6.0'
        COST_CURRENCY = 'EUR'
        AUDIT_RETENTION_YEARS = '7'  // Legal requirement for audit retention
    }

    stages {
        stage('Compliance Check') {
            parallel {
                stage('GDPR Data Scan') {
                    steps {
                        script {
                            echo "🔍 Scanning for personal data indicators across EU member states..."

                            def personalDataPatterns = [
                                'national\\s+identity', 'passport\\s+number', 'social.*security',
                                'tax\\s+identification', 'vat\\s+number', 'iban', 'bic',
                                'eori', "driver'?s\\s+licence", 'health\\s+insurance\\s+number',
                                'email.*address', 'phone.*number', 'date.*of.*birth'
                            ]

                            def violations = []

                            personalDataPatterns.each { pattern ->
                                def result = sh(
                                    script: "grep -R -i -E '${pattern}' infrastructure/ modules/ || true",
                                    returnStdout: true
                                ).trim()

                                if (result) {
                                    violations.add("Personal data pattern found for expression: ${pattern}")
                                }
                            }

                            if (violations) {
                                error("GDPR VIOLATION: Potential personal data detected in Architecture as Code assets:\n${violations.join('\n')}")
                            }

                            echo "✅ GDPR data scan completed successfully"
                        }
                    }
                }

                stage('Data Residency Validation') {
                    steps {
                        script {
                            echo "🏔️ Validating data residency requirements..."

                            def allowedRegions = ['eu-west-1', 'eu-central-1', 'eu-west-2']

                            def regionCheck = sh(
                                script: """
                                    grep -R 'region\\s*=' infrastructure/ modules/ | \
                                    grep -v -E '(eu-west-1|eu-central-1|eu-west-2)' || true
                                """,
                                returnStdout: true
                            ).trim()

                            if (regionCheck) {
                                error("DATA RESIDENCY VIOLATION: Non-approved regions detected:\n${regionCheck}")
                            }

                            echo "✅ Data residency requirements satisfied"
                        }
                    }
                }

                stage('Cost Center Validation') {
                    steps {
                        script {
                            echo "💰 Validating cost centre for accounting..."

                            if (!params.COST_CENTER.matches(/CC-[A-Z]{2,}-\d{3}/)) {
                                error("Invalid cost centre format. Use: CC-XX-nnn")
                            }

                            // Validate the cost centre exists in the organisation's systems
                            def validCostCenters = [
                                'CC-IT-001', 'CC-DEV-002', 'CC-OPS-003', 'CC-SEC-004'
                            ]

                            if (!validCostCenters.contains(params.COST_CENTER)) {
                                error("Unknown cost centre: ${params.COST_CENTER}")
                            }

                            echo "✅ Cost centre validated: ${params.COST_CENTER}"
                        }
                    }
                }
            }
        }

        stage('📝 Code Quality Analysis') {
            parallel {
                stage('Terraform Validation') {
                    steps {
                        script {
                            echo "🔧 Terraform syntax and formatting..."

                            // Format check
                            sh "terraform fmt -check -recursive infrastructure/"

                            // Syntax validation
                            dir('infrastructure/environments/${params.ENVIRONMENT}') {
                                sh """
                                    terraform init -backend=false
                                    terraform validate
                                """
                            }

                            echo "✅ Terraform validation completed"
                        }
                    }
                }

                stage('Security Scanning') {
                    steps {
                        script {
                            echo "🔒 Security scan with Checkov..."

                            sh """
                                pip install checkov
                                checkov -d infrastructure/ \
                                    --framework terraform \
                                    --output json \
                                    --output-file checkov-results.json \
                                    --soft-fail
                            """

                            // Analyse critical security issues
                            def results = readJSON file: 'checkov-results.json'
                            def criticalIssues = results.results.failed_checks.findAll { 
                                it.severity == 'CRITICAL' 
                            }

                            if (criticalIssues.size() > 0) {
                                echo "⚠️ Critical security issues found:"
                                criticalIssues.each { issue ->
                                    echo "- ${issue.check_name}: ${issue.file_path}"
                                }

                                if (params.ENVIRONMENT == 'production') {
                                    error("Critical security issues must be resolved before production deployment")
                                }
                            }

                            echo "✅ Security scan completed"
                        }
                    }
                }

                stage('Policy Validation') {
                    steps {
                        script {
                            echo "📋 Validating organisational policies..."

                            // Create OPA policies
                            writeFile file: 'policies/a-tagging.rego', text: """
                                package a.tagging

                                required_tags := [
                                    "Environment", "CostCenter", "Organization", 
                                    "Country", "GDPRCompliant", "DataResidency"
                                ]

                                deny[msg] {
                                    input.resource[resource_type][name]
                                    resource_type != "data"
                                    not input.resource[resource_type][name].tags
                                    msg := sprintf("Resource %s.%s is missing tags", [resource_type, name])
                                }

                                deny[msg] {
                                    input.resource[resource_type][name].tags
                                    required_tag := required_tags[_]
                                    not input.resource[resource_type][name].tags[required_tag]
                                    msg := sprintf("Resource %s.%s is missing required tag: %s", [resource_type, name, required_tag])
                                }
                            """

                            sh """
                                curl -L https://github.com/open-policy-agent/conftest/releases/download/v0.46.0/conftest_0.46.0_Linux_x86_64.tar.gz | tar xz
                                sudo mv conftest /usr/local/bin

                                find infrastructure/ -name "*.tf" -exec conftest verify --policy policies/ {} \\;
                            """

                            echo "✅ Policy validation completed"
                        }
                    }
                }
            }
        }

        stage('💰 Cost Control') {
            steps {
                script {
                    echo "📊 Calculating infrastructure costs in euros..."

                    // Set up Infracost with currency support
                    sh """
                        curl -fsSL https://raw.githubusercontent.com/infracost/infracost/master/scripts/install.sh | sh
                        export PATH=\$PATH:\$HOME/.local/bin

                        cd infrastructure/environments/${params.ENVIRONMENT}
                        terraform init -backend=false

                        infracost breakdown \\
                            --path . \\
                            --currency EUR \\
                            --format json \\
                            --out-file ../../../cost-estimate.json

                        infracost output \\
                            --path ../../../cost-estimate.json \\
                            --format table \\
                            --out-file ../../../cost-summary.txt
                    """

                    // Validate costs against budget thresholds
                    def costData = readJSON file: 'cost-estimate.json'
                    def monthlyCostEUR = costData.totalMonthlyCost as Double

                    def budgetLimits = [
                        'development': 5000,
                        'staging': 15000,
                        'production': 50000
                    ]

                    def maxBudget = budgetLimits[params.ENVIRONMENT] ?: 10000

                    echo "Calculated monthly cost: ${monthlyCostEUR} EUR"
                    echo "Budget for ${params.ENVIRONMENT}: ${maxBudget} EUR"

                    if (monthlyCostEUR > maxBudget) {
                        def overBudget = monthlyCostEUR - maxBudget
                        echo "⚠️ Budget exceeded by ${overBudget} EUR!"

                        if (params.ENVIRONMENT == 'production' && !params.FORCE_DEPLOYMENT) {
                            error("Budget overrun not permitted for production without CFO approval")
                        }
                    }

                    // Generate the cost report
                    def costReport = """
                    # Cost Report - ${env.ORGANIZATION_NAME}

                    **Environment:** ${params.ENVIRONMENT}
                    **Date:** ${new Date().format('yyyy-MM-dd HH:mm')} (local time)
                    **Cost centre:** ${params.COST_CENTER}

                    ## Monthly cost
                    - **Total:** ${monthlyCostEUR} EUR
                    - **Budget:** ${maxBudget} EUR
                    - **Status:** ${monthlyCostEUR <= maxBudget ? '✅ Within budget' : '❌ over budget'}

                    ## Cost breakdown
                    ${readFile('cost-summary.txt')}

                    ## Recommendations
                    - Use Reserved Instances for production workloads
                    - Enable auto-scaling for development environments
                    - Implement scheduled shutdowns for non-critical systems
                    """

                    writeFile file: 'cost-report-a.md', text: costReport
                    archiveArtifacts artifacts: 'cost-report-a.md', fingerprint: true

                    echo "✅ Cost control completed"
                }
            }
        }
    }
}

05_CODE_3: Terratest for VPC implementation {#05_code_3}

Referenced from chapter 5: Automation, DevOps and CI/CD for Architecture as Code

// test/a_vpc_test.go
// Terratest suite for VPC implementation with GDPR compliance

package test

import (
    "encoding/json"
    "fmt"
    "strings"
    "testing"
    "time"

    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/ec2"
    "github.com/aws/aws-sdk-go/service/cloudtrail"
    "github.com/gruntwork-io/terratest/modules/terraform"
    "github.com/gruntwork-io/terratest/modules/test-structure"
    "github.com/stretchr/testify/assert"
    "github.com/stretchr/testify/require"
)

// EuropeanVPCTestSuite defines the reusable Terratest suite for the VPC implementation
type EuropeanVPCTestSuite struct {
    TerraformOptions *terraform.Options
    AWSSession       *session.Session
    OrganizationName string
    Environment      string
    CostCenter       string
}

// TestEuropeanVPCGDPRCompliance validates GDPR compliance expectations for the VPC implementation
func TestEuropeanVPCGDPRCompliance(t *testing.T) {
    t.Parallel()

    suite := setupEuropeanVPCTest(t, "development")
    defer cleanupEuropeanVPCTest(t, suite)

    // Deploy infrastructure
    terraform.InitAndApply(t, suite.TerraformOptions)

    // Test GDPR compliance requirements
    t.Run("TestVPCFlowLogsEnabled", func(t *testing.T) {
        testVPCFlowLogsEnabled(t, suite)
    })

    t.Run("TestEncryptionAtRest", func(t *testing.T) {
        testEncryptionAtRest(t, suite)
    })

    t.Run("TestDataResidencyEU", func(t *testing.T) {
        testDataResidencyEU(t, suite)
    })

    t.Run("TestAuditLogging", func(t *testing.T) {
        testAuditLogging(t, suite)
    })

    t.Run("TestEuropeanTagging", func(t *testing.T) {
        testEuropeanTagging(t, suite)
    })
}

// setupEuropeanVPCTest prepares the temporary test environment for VPC validation
func setupEuropeanVPCTest(t *testing.T, environment string) *EuropeanVPCTestSuite {
    // Unique test identifier
    uniqueID := strings.ToLower(fmt.Sprintf("test-%d", time.Now().Unix()))
    organizationName := fmt.Sprintf("a-org-%s", uniqueID)

    // Terraform configuration
    terraformOptions := &terraform.Options{
        TerraformDir: "../infrastructure/modules/vpc",
        Whose: map[string]interface{}{
            "organization_name":     organizationName,
            "environment":          environment,
            "cost_center":          "CC-TEST-001",
            "gdpr_compliance":      true,
            "data_residency":       "EU",
            "enable_flow_logs":     true,
            "enable_encryption":    true,
            "audit_logging":        true,
        },
        BackendConfig: map[string]interface{}{
            "bucket": "a-org-terraform-test-state",
            "key":    fmt.Sprintf("test/%s/terraform.tfstate", uniqueID),
            "region": "eu-west-1",
        },
        RetryableTerraformErrors: map[string]string{
            ".*": "Transient error - retrying...",
        },
        MaxRetries:         3,
        TimeBetweenRetries: 5 * time.Second,
    }

    // AWS session for EU West region
    awsSession := session.Must(session.NewSession(&aws.Config{
        Region: aws.String("eu-west-1"),
    }))

    return &EuropeanVPCTestSuite{
        TerraformOptions: terraformOptions,
        AWSSession:       awsSession,
        OrganizationName: organizationName,
        Environment:      environment,
        CostCenter:       "CC-TEST-001",
    }
}

// testVPCFlowLogsEnabled validates that VPC Flow Logs are enabled for GDPR compliance
func testVPCFlowLogsEnabled(t *testing.T, suite *EuropeanVPCTestSuite) {
    // Fetch the VPC ID from Terraform output
    vpcID := terraform.Output(t, suite.TerraformOptions, "vpc_id")
    require.NotEmpty(t, vpcID, "VPC ID should not be empty")

    // AWS EC2 client
    ec2Client := ec2.New(suite.AWSSession)

    // Check Flow Logs configuration
    flowLogsInput := &ec2.DescribeFlowLogsInput{
        Filters: []*ec2.Filter{
            {
                Name:   aws.String("resource-id"),
                Values: []*string{aws.String(vpcID)},
            },
        },
    }

    flowLogsOutput, err := ec2Client.DescribeFlowLogs(flowLogsInput)
    require.NoError(t, err, "Failed to describe VPC flow logs")

    // Validate that VPC Flow Logs are enabled
    assert.Greater(t, len(flowLogsOutput.FlowLogs), 0, "VPC Flow Logs should be enabled for GDPR compliance")

    for _, flowLog := range flowLogsOutput.FlowLogs {
        assert.Equal(t, "Active", *flowLog.FlowLogStatus, "Flow log should be active")
        assert.Equal(t, "ALL", *flowLog.TrafficType, "Flow log should capture all traffic for compliance")
    }

    t.Logf("✅ VPC Flow Logs enabled for GDPR compliance: %s", vpcID)
}

// testEncryptionAtRest validates that all storage is encrypted according to GDPR requirements
func testEncryptionAtRest(t *testing.T, suite *EuropeanVPCTestSuite) {
    // Get KMS key from Terraform output
    kmsKeyArn := terraform.Output(t, suite.TerraformOptions, "kms_key_arn")
    require.NotEmpty(t, kmsKeyArn, "KMS key ARN should not be empty")

    // Validate that KMS key is from EU West region
    assert.Contains(t, kmsKeyArn, "eu-west-1", "KMS key should be in EU West region for data residency")

    t.Logf("✅ Encryption at rest validated for GDPR compliance")
}

// testDataResidencyEU validates that all infrastructure is within EU borders
func testDataResidencyEU(t *testing.T, suite *EuropeanVPCTestSuite) {
    // Validate that VPC is in EU region
    vpcID := terraform.Output(t, suite.TerraformOptions, "vpc_id")

    ec2Client := ec2.New(suite.AWSSession)

    vpcOutput, err := ec2Client.DescribeVpcs(&ec2.DescribeVpcsInput{
        VpcIds: []*string{aws.String(vpcID)},
    })
    require.NoError(t, err, "Failed to describe VPC")
    require.Len(t, vpcOutput.Vpcs, 1, "Should find exactly one VPC")

    // Check region from session config
    region := *suite.AWSSession.Config.Region
    allowedRegions := []string{"eu-west-1", "eu-central-1", "eu-west-2"}

    regionAllowed := false
    for _, allowedRegion := range allowedRegions {
        if region == allowedRegion {
            regionAllowed = true
            break
        }
    }

    assert.True(t, regionAllowed, "VPC must be in EU region for data residency. Found: %s", region)

    t.Logf("✅ Data residency validated - all infrastructure in EU region: %s", region)
}

// testAuditLogging validates that audit logging is configured according to legal requirements
func testAuditLogging(t *testing.T, suite *EuropeanVPCTestSuite) {
    // Check the CloudTrail configuration matches organisational expectations
    cloudtrailClient := cloudtrail.New(suite.AWSSession)

    trails, err := cloudtrailClient.DescribeTrails(&cloudtrail.DescribeTrailsInput{})
    require.NoError(t, err, "Failed to list CloudTrail trails")

    foundOrgTrail := false
    for _, trail := range trails.TrailList {
        if strings.Contains(*trail.Name, suite.OrganizationName) {
            foundOrgTrail = true
            t.Logf("✅ CloudTrail audit logging configured: %s", *trail.Name)
        }
    }

    assert.True(t, foundOrgTrail, "Organization CloudTrail should exist for audit logging")
}

// testEuropeanTagging confirms that all resources expose the expected governance tags
func testEuropeanTagging(t *testing.T, suite *EuropeanVPCTestSuite) {
    requiredTags := []string{
        "Environment", "Organization", "CostCenter", 
        "Country", "GDPRCompliant", "DataResidency",
    }

    expectedTagValues := map[string]string{
        "Environment":     suite.Environment,
        "Organization":    suite.OrganizationName,
        "CostCenter":      suite.CostCenter,
        "Country":         "EU",
        "GDPRCompliant":   "true",
        "DataResidency":   "EU",
    }

    // Test VPC tags
    vpcID := terraform.Output(t, suite.TerraformOptions, "vpc_id")
    ec2Client := ec2.New(suite.AWSSession)

    vpcTags, err := ec2Client.DescribeTags(&ec2.DescribeTagsInput{
        Filters: []*ec2.Filter{
            {
                Name:   aws.String("resource-id"),
                Values: []*string{aws.String(vpcID)},
            },
        },
    })
    require.NoError(t, err, "Failed to describe VPC tags")

    // Convert the returned tags into a map for simpler validation
    vpcTagMap := make(map[string]string)
    for _, tag := range vpcTags.Tags {
        vpcTagMap[*tag.Key] = *tag.Value
    }

    // Validate mandatory governance tags
    for _, requiredTag := range requiredTags {
        assert.Contains(t, vpcTagMap, requiredTag, "VPC should have required tag: %s", requiredTag)

        if expectedValue, exists := expectedTagValues[requiredTag]; exists {
            assert.Equal(t, expectedValue, vpcTagMap[requiredTag], 
                "Tag %s should have correct value", requiredTag)
        }
    }

    t.Logf("✅ Tagging validated for all resources")
}

// cleanupEuropeanVPCTest removes the Terraform deployment after the tests complete
func cleanupEuropeanVPCTest(t *testing.T, suite *EuropeanVPCTestSuite) {
    terraform.Destroy(t, suite.TerraformOptions)
    t.Logf("✅ Test environment removed for %s", suite.OrganizationName)
}

Infrastructure as Code – CloudFormation {#cloudformation-architecture-as-code}

Architecture as Code principles in this area emphasise resilient AWS foundations and strong governance.

This section contains CloudFormation templates for AWS infrastructure adapted for organisations.

07_CODE_1: VPC Setup for organisations with GDPR compliance {#07_code_1}

Referenced from Chapter 7: Containerisation and Orchestration as Code

# cloudformation/a-org-vpc.yaml
AWSTemplateFormatVersion: '2010-09-09'
Description: 'VPC setup for organisations with GDPR compliance'

Parameters:
  EnvironmentType:
    Type: String
    Default: development
    AllowedValues: [development, staging, production]
    Description: 'Environment type for deployment'

  DataClassification:
    Type: String
    Default: internal
    AllowedValues: [public, internal, confidential, restricted]
    Description: 'Data classification according to security standards'

  ComplianceRequirements:
    Type: CommaDelimitedList
    Default: "gdpr,iso27001"
    Description: 'List of compliance requirements that must be met'

Conditions:
  IsProduction: !Equals [!Ref EnvironmentType, production]
  RequiresGDPR: !Contains [!Ref ComplianceRequirements, gdpr]
  RequiresISO27001: !Contains [!Ref ComplianceRequirements, iso27001]

Resources:
  VPC:
    Type: AWS::EC2::VPC
    Properties:
      CidrBlock: !If [IsProduction, '10.0.0.0/16', '10.1.0.0/16']
      EnableDnsHostnames: true
      EnableDnsSupport: true
      Tags:
        - Key: Name
          Value: !Sub '${AWS::StackName}-vpc'
        - Key: Environment
          Value: !Ref EnvironmentType
        - Key: DataClassification
          Value: !Ref DataClassification
        - Key: GDPRCompliant
          Value: !If [RequiresGDPR, 'true', 'false']
        - Key: ISO27001Compliant
          Value: !If [RequiresISO27001, 'true', 'false']
        - Key: Country
          Value: 'EU'
        - Key: Region
          Value: 'eu-west-1'

Automation Scripts {#automation-scripts}

This section contains Python scripts and other automation tooling for Architecture as Code operations.

22_CODE_1: Comprehensive test framework for Architecture as Code {#22_code_1}

Architecture as Code principles within this area emphasise automated validation and transparent feedback. Referenced from chapter 24: Architecture as Code Best Practices and Lessons Learned

# testing/comprehensive_iac_testing.py
import pytest
import boto3
import json
import yaml
from typing import Dict, List, Any
from dataclasses import dataclass
from datetime import datetime, timedelta

@dataclass
class TestCase:
    name: str
    description: str
    test_type: str
    severity: str
    expected_result: Any
    actual_result: Any = None
    status: str = "pending"
    execution_time: float = 0.0

class ComprehensiveIaCTesting:
    """
    Comprehensive testing framework for Infrastructure as Code.
    Based on Architecture as Code best practices and international standards.
    """

    def __init__(self, region='eu-west-1'):
        self.region = region
        self.ec2 = boto3.client('ec2', region_name=region)
        self.rds = boto3.client('rds', region_name=region)
        self.s3 = boto3.client('s3', region_name=region)
        self.iam = boto3.client('iam', region_name=region)
        self.test_results = []

    def test_infrastructure_security(self, stack_name: str) -> List[TestCase]:
        """Test comprehensive security configuration"""

        security_tests = [
            self._test_encryption_at_rest(),
            self._test_encryption_in_transit(),
            self._test_vpc_flow_logs(),
            self._test_security_groups(),
            self._test_iam_policies(),
            self._test_s3_bucket_policies(),
            self._test_rds_security()
        ]

        return security_tests

    def _test_encryption_at_rest(self) -> TestCase:
        """Verify all storage resources use encryption at rest"""
        test = TestCase(
            name="Encryption at Rest Validation",
            description="Verify all storage uses encryption",
            test_type="security",
            severity="high",
            expected_result="All storage encrypted"
        )

        try:
            # Test S3 bucket encryption
            buckets = self.s3.list_buckets()['Buckets']
            unencrypted_buckets = []

            for bucket in buckets:
                bucket_name = bucket['Name']
                try:
                    encryption = self.s3.get_bucket_encryption(Bucket=bucket_name)
                    if not encryption.get('ServerSideEncryptionConfiguration'):
                        unencrypted_buckets.append(bucket_name)
                except self.s3.exceptions.ClientError:
                    unencrypted_buckets.append(bucket_name)

            if unencrypted_buckets:
                test.status = "failed"
                test.actual_result = f"Unencrypted buckets: {unencrypted_buckets}"
            else:
                test.status = "passed"
                test.actual_result = "All S3 buckets encrypted"

        except Exception as e:
            test.status = "error"
            test.actual_result = f"Test error: {str(e)}"

        return test

Configuration Files {#configuration}

This section contains configuration files for different tools and services.

22_CODE_2: Governance policy configuration for organisations {#22_code_2}

Referenced from chapter 24: Best Practices and Lessons Learned

# governance/a-governance-policy.yaml
governance_framework:
  organization: "Organization AB"
  compliance_standards: ["GDPR", "ISO27001", "SOC2"]
  data_residency: "EU"
  regulatory_authority: "Integritetsskyddsmyndigheten (IMY)"

policy_enforcement:
  automated_checks:
    pre_deployment:
      - "cost_estimation"
      - "security_scanning"
      - "compliance_validation"
      - "resource_tagging"

    post_deployment:
      - "security_monitoring"
      - "cost_monitoring"
      - "performance_monitoring"
      - "compliance_auditing"

  manual_approvals:
    production_deployments:
      approvers: ["Tech Lead", "Security Team", "Compliance Officer"]
      criteria:
        - "Security review completed"
        - "Cost impact assessed"
        - "GDPR compliance verified"
        - "Business stakeholder approval"

    emergency_changes:
      approvers: ["Incident Commander", "Security Lead"]
      max_approval_time: "30 minutes"
      post_incident_review: "required"

cost_governance:
  budget_controls:
    development: 
      monthly_limit: "10000 EUR"
      alert_threshold: "80%"
      auto_shutdown: "enabled"

    staging:
      monthly_limit: "25000 EUR"
      alert_threshold: "85%"
      auto_shutdown: "disabled"

    production:
      monthly_limit: "100000 EUR"
      alert_threshold: "90%"
      auto_shutdown: "disabled"
      escalation: "immediate"

security_policies:
  data_protection:
    encryption:
      at_rest: "mandatory"
      in_transit: "mandatory"
      key_management: "AWS KMS with customer managed keys"

    access_control:
      principle: "least_privilege"
      mfa_required: true
      session_timeout: "8 hours"
      privileged_access_review: "quarterly"

    monitoring:
      security_events: "all_logged"
      anomaly_detection: "enabled"
      incident_response: "24/7"
      retention_period: "7 years"

compliance_monitoring:
  gdpr_requirements:
    data_mapping: "automated"
    consent_management: "integrated"
    right_to_erasure: "implemented"
    data_breach_notification: "automated"

  audit_requirements:
    frequency: "quarterly"
    scope: "all_infrastructure"
    external_auditor: "required_annually"
    evidence_collection: "automated"

Chapter 13: Testing Strategies Reference Implementations {#chapter-13-testing}

This section contains comprehensive code examples referenced in Chapter 13: Testing Strategies for Infrastructure as Code.

13_CODE_A: Vitest Configuration for Infrastructure as Code Projects {#13_code_a}

Listing 13-A. Referenced from Chapter 13: Testing Strategies

This configuration demonstrates how to set up Vitest for testing infrastructure configuration generators and validation scripts.

// vitest.config.ts
import { defineConfig } from 'vitest/config';
import path from 'path';

export default defineConfig({
  test: {
    // Use globals to avoid imports in each test file
    globals: true,

    // Test environment (node for infrastructure tooling)
    environment: 'node',

    // Coverage configuration
    coverage: {
      provider: 'v8',
      reporter: ['text', 'json', 'html'],
      exclude: [
        'node_modules/',
        'dist/',
        '**/*.config.ts',
        '**/types/**',
      ],
      // Require at least 80% coverage for infrastructure code
      lines: 80,
      functions: 80,
      branches: 80,
      statements: 80,
    },

    // Test timeout for infrastructure operations
    testTimeout: 30000,

    // Include test files
    include: ['**/*.{test,spec}.{js,mjs,cjs,ts,mts,cts}'],

    // Exclude patterns
    exclude: [
      'node_modules',
      'dist',
      '.terraform',
      '**/*.d.ts',
    ],
  },

  resolve: {
    alias: {
      '@': path.resolve(__dirname, './src'),
      '@infra': path.resolve(__dirname, './infrastructure'),
    },
  },
});

13_CODE_B: Terraform Configuration Generator {#13_code_b}

Listing 13-B. Referenced from Chapter 13: Testing Strategies

This TypeScript module demonstrates programmatic generation of Terraform configurations with built-in compliance validation.

// src/generators/terraform-config.ts
export interface TerraformConfig {
  provider: string;
  region: string;
  environment: string;
  resources: ResourceConfig[];
}

export interface ResourceConfig {
  type: string;
  name: string;
  properties: Record<string, any>;
}

export class TerraformConfigGenerator {
  generateVPCConfig(
    environment: string,
    region: string = 'eu-west-1'
  ): TerraformConfig {
    // Validate EU regions for GDPR compliance
    const euRegions = ['eu-west-1', 'eu-central-1', 'eu-west-2'];
    if (!euRegions.includes(region)) {
      throw new Error('Region must be within EU for GDPR compliance');
    }

    return {
      provider: 'aws',
      region,
      environment,
      resources: [
        {
          type: 'aws_vpc',
          name: `vpc-${environment}`,
          properties: {
            cidr_block: '10.0.0.0/16',
            enable_dns_hostnames: true,
            enable_dns_support: true,
            tags: {
              Name: `vpc-${environment}`,
              Environment: environment,
              ManagedBy: 'Terraform',
              GdprCompliant: 'true',
              DataResidency: 'EU',
            },
          },
        },
      ],
    };
  }

  generateRDSConfig(
    environment: string,
    instanceClass: string = 'db.t3.micro',
    encrypted: boolean = true
  ): ResourceConfig {
    // Ensure encryption for production
    if (environment === 'production' && !encrypted) {
      throw new Error('Production databases must have encryption enabled');
    }

    return {
      type: 'aws_db_instance',
      name: `rds-${environment}`,
      properties: {
        allocated_storage: environment === 'production' ? 100 : 20,
        engine: 'postgres',
        engine_version: '14.7',
        instance_class: instanceClass,
        storage_encrypted: encrypted,
        backup_retention_period: environment === 'production' ? 30 : 7,
        multi_az: environment === 'production',
        tags: {
          Environment: environment,
          GdprCompliant: 'true',
          EncryptionEnabled: encrypted.toString(),
        },
      },
    };
  }
}

13_CODE_C: Terraform Configuration Generator Tests {#13_code_c}

Listing 13-C. Referenced from Chapter 13: Testing Strategies

Comprehensive Vitest suite validating the Terraform configuration generator with GDPR compliance checks and regional restrictions.

// src/generators/terraform-config.test.ts
import { describe, expect, it } from 'vitest';
import { TerraformConfigGenerator } from './terraform-config';

describe('TerraformConfigGenerator', () => {
  const generator = new TerraformConfigGenerator();

  it('creates a GDPR-compliant VPC for production in the EU', () => {
    const config = generator.generateVPCConfig('production', 'eu-west-2');

    expect(config.region).toBe('eu-west-2');
    expect(config.resources[0].properties.tags.GdprCompliant).toBe('true');
    expect(config.resources[0].properties.tags.DataResidency).toBe('EU');
  });

  it('refuses to generate VPCs outside approved EU regions', () => {
    expect(() => generator.generateVPCConfig('production', 'us-east-1')).toThrowError(
      'Region must be within EU for GDPR compliance'
    );
  });

  it('enforces encryption for production databases', () => {
    expect(() => generator.generateRDSConfig('production', 'db.r6g.large', false)).toThrowError(
      'Production databases must have encryption enabled'
    );

    const resource = generator.generateRDSConfig('production', 'db.r6g.large', true);
    expect(resource.properties.storage_encrypted).toBe(true);
    expect(resource.properties.multi_az).toBe(true);
    expect(resource.properties.tags.EncryptionEnabled).toBe('true');
  });

  it('creates leaner development databases whilst retaining encryption', () => {
    const resource = generator.generateRDSConfig('development');

    expect(resource.properties.allocated_storage).toBe(20);
    expect(resource.properties.multi_az).toBe(false);
    expect(resource.properties.storage_encrypted).toBe(true);
  });
});

13_CODE_D: Infrastructure Validator {#13_code_d}

Listing 13-D. Referenced from Chapter 13: Testing Strategies

Infrastructure validation module that checks resources against organisational policies and compliance requirements.

// src/validators/infrastructure-validator.ts
export interface ResourceTag {
  key: string;
  value: string;
}

export interface SecurityRule {
  protocol: 'tcp' | 'udp' | 'icmp';
  port: number;
  cidr: string;
}

export interface ClassifiedResource {
  id: string;
  classification: 'public' | 'internal' | 'confidential';
  encrypted: boolean;
}

export interface ResourceDefinition {
  id: string;
  type: string;
  tags: ResourceTag[];
  securityRules?: SecurityRule[];
  data?: ClassifiedResource;
}

export interface ValidationResult {
  id: string;
  errors: string[];
  warnings: string[];
}

export class InfrastructureValidator {
  constructor(private readonly mandatoryTags: string[]) {}

  validate(resources: ResourceDefinition[]): ValidationResult[] {
    return resources.map((resource) => ({
      id: resource.id,
      errors: [
        ...this.validateMandatoryTags(resource),
        ...this.validateSecurityRules(resource),
        ...this.validateDataClassification(resource),
      ],
      warnings: this.collectWarnings(resource),
    }));
  }

  private validateMandatoryTags(resource: ResourceDefinition): string[] {
    const presentKeys = resource.tags.map((tag) => tag.key);
    return this.mandatoryTags
      .filter((tag) => !presentKeys.includes(tag))
      .map((tag) => `Missing mandatory tag: ${tag}`);
  }

  private validateSecurityRules(resource: ResourceDefinition): string[] {
    if (!resource.securityRules?.length) {
      return [];
    }

    return resource.securityRules
      .filter((rule) => rule.protocol === 'tcp' && rule.port === 22 && rule.cidr === '0.0.0.0/0')
      .map(() => 'SSH must not be open to the internet');
  }

  private validateDataClassification(resource: ResourceDefinition): string[] {
    if (!resource.data) {
      return [];
    }

    if (resource.data.classification === 'confidential' && !resource.data.encrypted) {
      return ['Confidential resources must be encrypted'];
    }

    return [];
  }

  private collectWarnings(resource: ResourceDefinition): string[] {
    if (!resource.securityRules?.length) {
      return ['No security rules defined; ensure defence-in-depth controls exist'];
    }

    const usesWildcard = resource.securityRules.some((rule) => rule.cidr === '0.0.0.0/0');
    return usesWildcard ? ['Wildcard CIDR detected; confirm zero-trust posture'] : [];
  }
}

13_CODE_E: Infrastructure Validator Tests {#13_code_e}

Listing 13-E. Referenced from Chapter 13: Testing Strategies

Comprehensive Vitest suite covering mandatory tags, security groups and compliance policies.

// src/validators/infrastructure-validator.test.ts
import { describe, expect, it } from 'vitest';
import { InfrastructureValidator } from './infrastructure-validator';

const validator = new InfrastructureValidator(['Environment', 'CostCentre', 'Owner']);

describe('InfrastructureValidator', () => {
  it('flags resources missing mandatory tags', () => {
    const [result] = validator.validate([
      {
        id: 'vpc-001',
        type: 'aws_vpc',
        tags: [
          { key: 'Environment', value: 'production' },
          { key: 'Owner', value: 'platform-team' },
        ],
      },
    ]);

    expect(result.errors).toContain('Missing mandatory tag: CostCentre');
  });

  it('detects internet-facing SSH rules', () => {
    const [result] = validator.validate([
      {
        id: 'sg-123',
        type: 'aws_security_group',
        tags: [
          { key: 'Environment', value: 'staging' },
          { key: 'CostCentre', value: 'IT-042' },
          { key: 'Owner', value: 'security-team' },
        ],
        securityRules: [
          { protocol: 'tcp', port: 22, cidr: '0.0.0.0/0' },
        ],
      },
    ]);

    expect(result.errors).toContain('SSH must not be open to the internet');
    expect(result.warnings).toContain('Wildcard CIDR detected; confirm zero-trust posture');
  });

  it('enforces encryption for confidential data sets', () => {
    const [result] = validator.validate([
      {
        id: 's3-logs',
        type: 'aws_s3_bucket',
        tags: [
          { key: 'Environment', value: 'production' },
          { key: 'CostCentre', value: 'FIN-001' },
          { key: 'Owner', value: 'risk-office' },
        ],
        data: {
          id: 'audit-logs',
          classification: 'confidential',
          encrypted: false,
        },
      },
    ]);

    expect(result.errors).toContain('Confidential resources must be encrypted');
  });

  it('passes compliant resources without errors', () => {
    const [result] = validator.validate([
      {
        id: 'db-analytics',
        type: 'aws_rds_instance',
        tags: [
          { key: 'Environment', value: 'production' },
          { key: 'CostCentre', value: 'DATA-007' },
          { key: 'Owner', value: 'data-platform' },
        ],
        securityRules: [
          { protocol: 'tcp', port: 5432, cidr: '10.0.0.0/16' },
        ],
        data: {
          id: 'analytics',
          classification: 'internal',
          encrypted: true,
        },
      },
    ]);

    expect(result.errors).toHaveLength(0);
    expect(result.warnings).toHaveLength(0);
  });
});

13_CODE_F: GitHub Actions Vitest Workflow {#13_code_f}

Listing 13-F. Referenced from Chapter 13: Testing Strategies

CI/CD workflow demonstrating automated infrastructure code testing with coverage reporting.

# .github/workflows/infrastructure-vitest.yml
name: Infrastructure Code Tests

on:
  pull_request:
    paths:
      - 'src/**'
      - 'package.json'
      - 'pnpm-lock.yaml'
      - 'vitest.config.ts'

jobs:
  vitest:
    runs-on: ubuntu-latest

    steps:
      - name: Check out repository
        uses: actions/checkout@v4

      - name: Use Node.js 20
        uses: actions/setup-node@v4
        with:
          node-version: 20
          cache: 'pnpm'

      - name: Install dependencies
        run: |
          corepack enable
          pnpm install --frozen-lockfile

      - name: Run Vitest with coverage
        run: pnpm vitest run --coverage

      - name: Upload coverage report
        if: always()
        uses: actions/upload-artifact@v4
        with:
          name: vitest-coverage
          path: coverage/

      - name: Summarise test results
        if: always()
        uses: dorny/test-reporter@v1
        with:
          name: Vitest
          path: coverage/coverage-final.json
          reporter: vitest

13_CODE_G: Requirements as Code Definition {#13_code_g}

Listing 13-G. Referenced from Chapter 13: Testing Strategies

YAML-based requirements definition enabling traceability from business requirements to automated tests with compliance mapping and test specifications.

# requirements/catalogue.yaml
metadata:
  version: 1
  generated: 2024-04-15
  owner: platform-risk-office

requirements:
  - id: SEC-001
    title: Encrypt customer data at rest
    priority: high
    classification: gdpr
    rationale: Protect personally identifiable information across EU jurisdictions.
    tests:
      - id: test-encryption-s3
        description: Validate that S3 buckets containing GDPR data enable default encryption.
        tooling: opa
      - id: test-encryption-rds
        description: Confirm RDS instances with GDPR tags have storage encryption enabled.
        tooling: terratest

  - id: PERF-003
    title: Maintain API latency under 250ms at p95
    priority: medium
    classification: service-level
    rationale: Preserve customer experience during peak trading windows.
    tests:
      - id: test-load-run
        description: Execute Locust load test to validate latency thresholds.
        tooling: k6
      - id: test-auto-scaling
        description: Verify auto-scaling triggers at 70% CPU utilisation.
        tooling: terraform

  - id: GOV-010
    title: Ensure ownership metadata is present for every resource
    priority: high
    classification: governance
    rationale: Support cost allocation and on-call routing.
    tests:
      - id: test-tag-enforcement
        description: Confirm mandatory tags (Environment, CostCentre, Owner) exist on provisioned resources.
        tooling: policy-as-code

13_CODE_H: Requirements Validation Framework {#13_code_h}

Listing 13-H. Referenced from Chapter 13: Testing Strategies

Python framework for automated validation of requirements against Infrastructure as Code implementations, including test execution and compliance coverage reporting.

# requirements/validator.py
from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Iterable, List

import yaml


@dataclass
class Requirement:
    id: str
    title: str
    classification: str
    priority: str
    tests: List[Dict[str, str]]


class RequirementValidator:
    def __init__(self, catalogue: Iterable[Requirement]):
        self.catalogue = list(catalogue)

    @classmethod
    def from_file(cls, path: Path) -> 'RequirementValidator':
        data = yaml.safe_load(path.read_text())
        requirements = [Requirement(**item) for item in data['requirements']]
        return cls(requirements)

    def coverage_summary(self, executed_tests: Iterable[str]) -> Dict[str, float]:
        executed = set(executed_tests)
        total = len(self.catalogue)
        covered = sum(1 for requirement in self.catalogue if self._is_covered(requirement, executed))
        return {
            'total_requirements': total,
            'covered_requirements': covered,
            'coverage_percentage': round((covered / total) * 100, 2) if total else 100.0,
        }

    def missing_tests(self, executed_tests: Iterable[str]) -> Dict[str, List[str]]:
        executed = set(executed_tests)
        missing: Dict[str, List[str]] = {}
        for requirement in self.catalogue:
            outstanding = [test['id'] for test in requirement.tests if test['id'] not in executed]
            if outstanding:
                missing[requirement.id] = outstanding
        return missing

    @staticmethod
    def _is_covered(requirement: Requirement, executed: set[str]) -> bool:
        return all(test['id'] in executed for test in requirement.tests)


def load_validator(catalogue_path: str) -> RequirementValidator:
    return RequirementValidator.from_file(Path(catalogue_path))

13_CODE_I: Terratest for GDPR-Compliant Infrastructure {#13_code_i}

Listing 13-I. Referenced from Chapter 13: Testing Strategies

Comprehensive Terratest example demonstrating testing of Terraform infrastructure with GDPR compliance validation, data residency requirements and organisational tagging standards for regulated environments.

// test/terraform_gdpr_test.go
package test

import (
    "testing"

    "github.com/gruntwork-io/terratest/modules/aws"
    "github.com/gruntwork-io/terratest/modules/terraform"
    "github.com/stretchr/testify/require"
)

func TestPlatformStackGDPRCompliance(t *testing.T) {
    t.Parallel()

    terraformOptions := &terraform.Options{
        TerraformDir: "../infrastructure",
        Vars: map[string]interface{}{
            "environment": "staging",
            "region":       "eu-west-1",
        },
    }

    defer terraform.Destroy(t, terraformOptions)
    terraform.InitAndApply(t, terraformOptions)

    vpcID := terraform.Output(t, terraformOptions, "vpc_id")
    require.NotEmpty(t, vpcID)

    flowLogsEnabled := terraform.Output(t, terraformOptions, "vpc_flow_logs_enabled")
    require.Equal(t, "true", flowLogsEnabled, "VPC flow logs must be enabled for GDPR auditing")

    bucketID := terraform.Output(t, terraformOptions, "logging_bucket_id")
    encryption := aws.GetS3BucketEncryption(t, "eu-west-1", bucketID)
    require.Equal(t, "AES256", *encryption.ServerSideEncryptionConfiguration.Rules[0].ApplyServerSideEncryptionByDefault.SSEAlgorithm)

    tags := aws.GetTagsForVpc(t, "eu-west-1", vpcID)
    require.Equal(t, "true", tags["GdprCompliant"])
    require.Equal(t, "EU", tags["DataResidency"])
}

13_CODE_J: Policy-as-Code Testing with OPA {#13_code_j}

Listing 13-J. Referenced from Chapter 13: Testing Strategies

Open Policy Agent (OPA) test examples demonstrating validation of S3 bucket encryption, EC2 security group requirements and GDPR data classification compliance.

# policies/infrastructure/security.rego
package infrastructure.security

default allow = true

deny[msg] {
  input.resource.type == "aws_s3_bucket"
  not input.resource.encryption.enabled
  msg := sprintf("Bucket %s must enable server-side encryption", [input.resource.id])
}

deny[msg] {
  input.resource.type == "aws_security_group_rule"
  input.resource.protocol == "tcp"
  input.resource.port == 22
  input.resource.cidr == "0.0.0.0/0"
  msg := sprintf("SSH rule %s exposes port 22 to the internet", [input.resource.id])
}

deny[msg] {
  input.resource.type == "aws_rds_instance"
  input.resource.tags["Classification"] == "confidential"
  not input.resource.encryption.enabled
  msg := sprintf("RDS instance %s storing confidential data must be encrypted", [input.resource.id])
}

# policies/infrastructure/security_test.rego
package infrastructure

test_enforces_bucket_encryption {
  results := data.infrastructure.security.deny with input as {
    "resource": {
      "id": "audit-logs",
      "type": "aws_s3_bucket",
      "encryption": {"enabled": false}
    }
  }

  results[0] == "Bucket audit-logs must enable server-side encryption"
}

test_blocks_internet_ssh {
  results := data.infrastructure.security.deny with input as {
    "resource": {
      "id": "sg-rule-1",
      "type": "aws_security_group_rule",
      "protocol": "tcp",
      "port": 22,
      "cidr": "0.0.0.0/0"
    }
  }

  results[0] == "SSH rule sg-rule-1 exposes port 22 to the internet"
}

test_requires_encrypted_confidential_databases {
  results := data.infrastructure.security.deny with input as {
    "resource": {
      "id": "rds-analytics",
      "type": "aws_rds_instance",
      "tags": {"Classification": "confidential"},
      "encryption": {"enabled": false}
    }
  }

  results[0] == "RDS instance rds-analytics storing confidential data must be encrypted"
}

13_CODE_K: Kubernetes Infrastructure Test Suite {#13_code_k}

Listing 13-K. Referenced from Chapter 13: Testing Strategies

Comprehensive Kubernetes infrastructure test suite demonstrating validation of resource quotas, pod security policies, network policies and GDPR-compliant persistent volume encryption using ConfigMap-based test runners and Kubernetes Jobs.

# k8s/tests/infrastructure-validation.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: infrastructure-test-runner
data:
  tests.sh: |
    #!/usr/bin/env bash
    set -euo pipefail

    echo "Validating namespace resource quotas..."
    kubectl get resourcequota -A || {
      echo "❌ Resource quotas missing"; exit 1;
    }

    echo "Validating pod security admission levels..."
    kubectl get pods -A -o json | jq '.items[].metadata.labels["pod-security.kubernetes.io/enforce"]' |
      grep -q "baseline" || {
      echo "❌ Pods missing baseline security enforcement"; exit 1;
    }

    echo "Validating encrypted persistent volumes..."
    kubectl get pv -o json | jq '.items[].metadata.annotations["encryption.alpha.kubernetes.io/encrypted"]' |
      grep -q "true" || {
      echo "❌ Persistent volumes must enable encryption"; exit 1;
    }

    echo "✅ Infrastructure validation passed"
---
apiVersion: batch/v1
kind: Job
metadata:
  name: infrastructure-validation
spec:
  template:
    spec:
      restartPolicy: Never
      serviceAccountName: platform-auditor
      containers:
        - name: validator
          image: bitnami/kubectl:1.29
          command: ["/bin/bash", "/scripts/tests.sh"]
          volumeMounts:
            - name: test-scripts
              mountPath: /scripts
      volumes:
        - name: test-scripts
          configMap:
            name: infrastructure-test-runner
            defaultMode: 0555

13_CODE_L: Infrastructure Testing Pipeline {#13_code_l}

Listing 13-L. Referenced from Chapter 13: Testing Strategies

Complete GitHub Actions workflow demonstrating infrastructure testing pipeline with static analysis (Terraform fmt, Checkov, OPA), unit testing (Terratest), integration testing with temporary environments, compliance validation (GDPR, encryption, regional restrictions), and performance testing with cost analysis.

# .github/workflows/infrastructure-pipeline.yml
name: Infrastructure Quality Gate

on:
  pull_request:
    paths:
      - 'infrastructure/**'
      - 'modules/**'
      - 'policies/**'
      - 'test/**'

jobs:
  static-analysis:
    name: Static Analysis
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Terraform fmt
        run: terraform fmt -check -recursive
      - name: Terraform validate
        run: terraform validate
      - name: Checkov security scan
        uses: bridgecrewio/checkov-action@v12
        with:
          directory: infrastructure

  policy:
    name: Policy-as-Code
    runs-on: ubuntu-latest
    needs: static-analysis
    steps:
      - uses: actions/checkout@v4
      - name: Evaluate OPA policies
        run: |
          opa test policies/infrastructure -v

  unit-tests:
    name: Terratest Suite
    runs-on: ubuntu-latest
    needs: policy
    steps:
      - uses: actions/checkout@v4
      - uses: hashicorp/setup-terraform@v3
      - name: Execute Terratest
        run: go test ./test -timeout 45m

  integration:
    name: Ephemeral Environment Validation
    runs-on: ubuntu-latest
    needs: unit-tests
    env:
      TF_IN_AUTOMATION: true
    steps:
      - uses: actions/checkout@v4
      - uses: hashicorp/setup-terraform@v3
      - name: Create workspace
        run: terraform workspace new pr-${{ github.event.number }} || terraform workspace select pr-${{ github.event.number }}
      - name: Terraform plan (ephemeral)
        run: terraform plan -out=tfplan
      - name: Terraform apply (ephemeral)
        run: terraform apply -auto-approve tfplan
      - name: Execute integration scripts
        run: ./scripts/validate-integration.sh
      - name: Terraform destroy
        if: always()
        run: terraform destroy -auto-approve

  compliance:
    name: Compliance Checks
    runs-on: ubuntu-latest
    needs: integration
    steps:
      - uses: actions/checkout@v4
      - name: Run requirements coverage
        run: |
          python3 -m pip install -r requirements.txt
          python3 requirements/report.py --catalogue requirements/catalogue.yaml --results reports/test-results.json

  performance:
    name: Performance and Cost Review
    runs-on: ubuntu-latest
    needs: compliance
    steps:
      - uses: actions/checkout@v4
      - name: Execute k6 performance tests
        run: k6 run tests/performance/k6-load-test.js
      - name: Analyse cost impact
        run: python3 scripts/cost_analysis.py --plan outputs/terraform-plan.json

  summary:
    name: Quality Summary
    runs-on: ubuntu-latest
    needs: [performance]
    steps:
      - name: Collate results
        run: ./scripts/summarise-quality-gates.sh

Organisational Change and Team Structures {#organisational-change}

17_CODE_1: Infrastructure Platform Team Blueprint {#17_code_1}

Listing 17-A. Referenced from chapter 17: Organisational Change and Team Structures

# organisational_design/devops_team_structure.yaml
team_structure:
  name: "Infrastructure Platform Team"
  size: 7
  mission: "Enable autonomous product teams through self-service infrastructure"

  roles:
    - role: "Team Lead / Product Owner"
      responsibilities:
        - "Strategic direction and product roadmap"
        - "Stakeholder communication"
        - "Resource allocation and prioritisation"
        - "Team development and performance management"
      skills_required:
        - "Product management"
        - "Technical leadership"
        - "Agile methodologies"
        - "Stakeholder management"

    - role: "Senior Infrastructure Engineer"
      count: 2
      responsibilities:
        - "Infrastructure as Code development"
        - "Cloud architecture design"
        - "Platform automation"
        - "Technical mentoring"
      skills_required:
        - "Terraform/CloudFormation expert"
        - "Multi-cloud platforms (AWS/Azure/GCP)"
        - "Containerisation (Docker/Kubernetes)"
        - "CI/CD pipelines"
        - "Programming (Python/Go/Bash)"

    - role: "Cloud Security Engineer"
      responsibilities:
        - "Security policy as code"
        - "Compliance automation"
        - "Threat modelling for cloud infrastructure"
        - "Security scanning integration"
      skills_required:
        - "Cloud security architecture best practices"
        - "Policy engines (OPA/AWS Config)"
        - "Security scanning tools"
        - "Compliance frameworks (ISO27001/SOC2)"

    - role: "Platform Automation Engineer"
      count: 2
      responsibilities:
        - "CI/CD pipeline development"
        - "Monitoring and observability"
        - "Self-service tool development"
        - "Developer experience improvement"
      skills_required:
        - "GitOps workflows"
        - "Monitoring stack (Prometheus/Grafana)"
        - "API development"
        - "Developer tooling"

    - role: "Site Reliability Engineer"
      responsibilities:
        - "Production operations"
        - "Incident response"
        - "Capacity planning"
        - "Performance optimisation"
      skills_required:
        - "Production operations"
        - "Incident management"
        - "Performance analysis"
        - "Automation scripting"

  working_agreements:
    daily_standup: "09:00 local time each weekday"
    sprint_length: "2 weeks"
    retrospective: "End of each sprint"
    on_call_rotation: "1 week rotation shared by SREs and Infrastructure Engineers"

  success_metrics:
    infrastructure_deployment_time: "< 15 minutes from commit to production"
    incident_resolution_time: "< 30 minutes for P1 incidents"
    developer_satisfaction: "> 4.5/5 in quarterly surveys"
    infrastructure_cost_efficiency: "10% yearly improvement"
    security_compliance_score: "> 95%"

  communication_patterns:
    internal_team:
      - "Daily stand-ups for coordination"
      - "Weekly technical deep dives"
      - "Monthly team retrospectives"
      - "Quarterly goal-setting sessions"

    external_stakeholders:
      - "Bi-weekly demos for product teams"
      - "Monthly steering committee updates"
      - "Quarterly business review presentations"
      - "Ad-hoc consultation for complex integrations"

  decision_making:
    technical_decisions: "Consensus among technical team members"
    architectural_decisions: "Technical lead with team input"
    strategic_decisions: "Product owner with business stakeholder input"
    operational_decisions: "On-call engineer authority with escalation path"

  continuous_improvement:
    learning_budget: "40 hours per person per quarter"
    conference_attendance: "2 team members per year at major conferences"
    experimentation_time: "20% time for innovation projects"
    knowledge_sharing: "Monthly internal tech talks"

17_CODE_2: IaC Competency Framework Utilities {#17_code_2}

Listing 17-B. Referenced from chapter 17: Organisational Change and Team Structures

# training/iac_competency_framework.py
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import json


class IaCCompetencyFramework:
    """Comprehensive competency framework for Infrastructure as Code skills."""

    def __init__(self):
        self.competency_levels = {
            "novice": {
                "description": "Basic understanding, requires guidance",
                "hours_required": 40,
                "assessment_criteria": [
                    "Can execute predefined Architecture as Code templates",
                    "Understands foundational cloud concepts",
                    "Can follow established procedures"
                ]
            },
            "intermediate": {
                "description": "Can work independently on common tasks",
                "hours_required": 120,
                "assessment_criteria": [
                    "Can create simple Architecture as Code modules",
                    "Understands infrastructure dependencies",
                    "Can troubleshoot common issues"
                ]
            },
            "advanced": {
                "description": "Can design and lead complex implementations",
                "hours_required": 200,
                "assessment_criteria": [
                    "Can architect multi-environment solutions",
                    "Can mentor others effectively",
                    "Can design reusable patterns"
                ]
            },
            "expert": {
                "description": "Thought leader, can drive organisational standards",
                "hours_required": 300,
                "assessment_criteria": [
                    "Can drive organisational Architecture as Code strategy",
                    "Can design complex multi-cloud solutions",
                    "Can lead transformation initiatives"
                ]
            }
        }

        self.skill_domains = {
            "infrastructure_as_code": {
                "tools": ["Terraform", "CloudFormation", "Pulumi", "Ansible"],
                "concepts": ["Declarative syntax", "State management", "Module design"],
                "practices": ["Code organisation", "Testing strategies", "CI/CD integration"]
            },
            "cloud_platforms": {
                "aws": ["EC2", "VPC", "RDS", "Lambda", "S3", "IAM"],
                "azure": ["Virtual Machines", "Resource Groups", "Storage", "Functions"],
                "gcp": ["Compute Engine", "VPC", "Cloud Storage", "Cloud Functions"],
                "multi_cloud": ["Provider abstraction", "Cost optimisation", "Governance"]
            },
            "security_compliance": {
                "security": ["Identity management", "Network security", "Encryption"],
                "compliance": ["GDPR", "ISO27001", "SOC2", "Data residency"],
                "policy": ["Policy as Code", "Automated compliance", "Audit trails"]
            },
            "operations_monitoring": {
                "monitoring": ["Metrics collection", "Alerting", "Dashboards"],
                "logging": ["Log aggregation", "Analysis", "Retention"],
                "incident_response": ["Runbooks", "Post-mortems", "Automation"]
            }
        }

    def create_learning_path(self, current_level: str, target_level: str,
                             focus_domains: List[str]) -> Dict:
        """Create a personalised learning path for an individual."""

        current_hours = self.competency_levels[current_level]["hours_required"]
        target_hours = self.competency_levels[target_level]["hours_required"]
        required_hours = target_hours - current_hours

        learning_path = {
            "individual_id": f"learner_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
            "current_level": current_level,
            "target_level": target_level,
            "estimated_duration_hours": required_hours,
            "estimated_timeline_weeks": required_hours // 10,  # 10 hours per week
            "focus_domains": focus_domains,
            "learning_modules": []
        }

        # Generate learning modules based on focus domains
        for domain in focus_domains:
            if domain in self.skill_domains:
                modules = self._generate_domain_modules(domain, current_level, target_level)
                learning_path["learning_modules"].extend(modules)

        return learning_path

    def _generate_domain_modules(self, domain: str, current_level: str,
                                 target_level: str) -> List[Dict]:
        """Generate learning modules for a specific domain."""

        modules = []
        domain_skills = self.skill_domains[domain]

        # Terraform Fundamentals Module
        if domain == "infrastructure_as_code":
            modules.append({
                "name": "Terraform Fundamentals for Regulated Enterprises",
                "duration_hours": 16,
                "type": "hands_on_workshop",
                "prerequisites": ["Basic Linux", "Cloud fundamentals"],
                "learning_objectives": [
                    "Build foundational Terraform configurations",
                    "Manage remote state securely",
                    "Design compliance-aligned infrastructure patterns",
                    "Integrate controls with platform tooling"
                ],
                "practical_exercises": [
                    "Deploy a data-protection compliant object storage bucket",
                    "Create a VPC with network guardrails",
                    "Implement IAM policies aligned to least privilege",
                    "Configure monitoring aligned with regulatory obligations"
                ],
                "assessment": {
                    "type": "practical_project",
                    "description": "Deploy a complete web application stack with automated governance controls"
                }
            })

        # Cloud Security Module
        if domain == "security_compliance":
            modules.append({
                "name": "Cloud Security for Regulated Environments",
                "duration_hours": 12,
                "type": "blended_learning",
                "prerequisites": ["Cloud fundamentals", "Security basics"],
                "learning_objectives": [
                    "Implement privacy-aware infrastructure baselines",
                    "Apply regulatory control frameworks in code",
                    "Create automated compliance checks",
                    "Design secure network topologies"
                ],
                "practical_exercises": [
                    "Create a compliance-aligned data pipeline",
                    "Implement policy-as-code guardrails",
                    "Set up automated compliance monitoring",
                    "Design an incident response workflow"
                ],
                "assessment": {
                    "type": "compliance_audit",
                    "description": "Demonstrate infrastructure meets regulatory security requirements"
                }
            })

        return modules

    def track_progress(self, individual_id: str, completed_module: str,
                       assessment_score: float) -> Dict:
        """Track learning progress for an individual."""

        progress_record = {
            "individual_id": individual_id,
            "module_completed": completed_module,
            "completion_date": datetime.now().isoformat(),
            "assessment_score": assessment_score,
            "certification_earned": assessment_score >= 0.8,
            "next_recommended_module": self._recommend_next_module(individual_id)
        }

        return progress_record

    def generate_team_competency_matrix(self, team_members: List[Dict]) -> Dict:
        """Generate a team competency matrix for skills gap analysis."""

        competency_matrix = {
            "team_id": f"team_{datetime.now().strftime('%Y%m%d')}",
            "assessment_date": datetime.now().isoformat(),
            "team_size": len(team_members),
            "overall_readiness": 0,
            "skill_gaps": [],
            "training_recommendations": [],
            "members": []
        }

        total_competency = 0

        for member in team_members:
            member_assessment = {
                "name": member["name"],
                "role": member["role"],
                "current_skills": member.get("skills", {}),
                "competency_score": self._calculate_competency_score(member),
                "development_needs": self._identify_development_needs(member),
                "certification_status": member.get("certifications", [])
            }

            competency_matrix["members"].append(member_assessment)
            total_competency += member_assessment["competency_score"]

        competency_matrix["overall_readiness"] = total_competency / len(team_members)
        competency_matrix["skill_gaps"] = self._identify_team_skill_gaps(team_members)
        competency_matrix["training_recommendations"] = self._recommend_team_training(competency_matrix)

        return competency_matrix


def create_organizational_change_plan(organization_assessment: Dict) -> Dict:
    """Create a comprehensive organisational change plan for Architecture as Code adoption."""

    change_plan = {
        "organization": organization_assessment["name"],
        "current_state": organization_assessment["current_maturity"],
        "target_state": "advanced_devops",
        "timeline_months": 18,
        "phases": [
            {
                "name": "Foundation Building",
                "duration_months": 6,
                "objectives": [
                    "Establish DevOps culture basics",
                    "Implement foundational Architecture as Code practices",
                    "Create cross-functional teams",
                    "Set up the initial toolchain"
                ],
                "activities": [
                    "DevOps culture workshops",
                    "Tool selection and setup",
                    "Team restructuring",
                    "Initial training programme",
                    "Pilot project implementation"
                ],
                "success_criteria": [
                    "All teams trained on DevOps fundamentals",
                    "Initial Architecture as Code deployment pipeline operational",
                    "Cross-functional teams established",
                    "Core toolchain adopted"
                ]
            },
            {
                "name": "Capability Development",
                "duration_months": 8,
                "objectives": [
                    "Scale Architecture as Code practices across the organisation",
                    "Implement advanced automation",
                    "Establish monitoring and observability",
                    "Mature incident response processes"
                ],
                "activities": [
                    "Advanced Architecture as Code training rollout",
                    "Multi-environment deployment automation",
                    "Comprehensive monitoring implementation",
                    "Incident response process development",
                    "Security integration (DevSecOps)"
                ],
                "success_criteria": [
                    "Architecture as Code practices adopted by all product teams",
                    "Automated deployment across all environments",
                    "Comprehensive observability implemented",
                    "Incident response processes matured"
                ]
            },
            {
                "name": "Optimisation and Innovation",
                "duration_months": 4,
                "objectives": [
                    "Optimise existing processes",
                    "Implement advanced practices",
                    "Foster continuous innovation",
                    "Measure and improve business outcomes"
                ],
                "activities": [
                    "Process optimisation based on metrics",
                    "Advanced practice implementation",
                    "Innovation time allocation",
                    "Business value measurement",
                    "Knowledge sharing programme"
                ],
                "success_criteria": [
                    "Optimised processes delivering measurable value",
                    "Innovation culture established",
                    "Improved business outcomes",
                    "Self-sustaining improvement culture"
                ]
            }
        ],
        "change_management": {
            "communication_strategy": [
                "Monthly all-hands updates",
                "Quarterly progress reviews",
                "Success story sharing",
                "Feedback collection mechanisms"
            ],
            "resistance_management": [
                "Early stakeholder engagement",
                "Addressing skill development concerns",
                "Providing clear career progression paths",
                "Celebrating early wins"
            ],
            "success_measurement": [
                "Employee satisfaction surveys",
                "Technical capability assessments",
                "Business value metrics",
                "Cultural transformation indicators"
            ]
        },
        "risk_mitigation": [
            "Gradual rollout to minimise disruption",
            "Comprehensive training to address skill gaps",
            "Clear communication to manage expectations",
            "Strong support structure for teams"
        ]
    }

    return change_plan

17_CODE_3: DevOps Performance Measurement Framework {#17_code_3}

Listing 17-C. Referenced from chapter 17: Organisational Change and Team Structures

# metrics/devops_performance_metrics.yaml
performance_measurement_framework:
  name: "DevOps Team Performance Metrics"

  technical_metrics:
    deployment_frequency:
      description: "How often the team deploys to production"
      measurement: "Deployments per day/week"
      target_values:
        elite: "> 1 per day"
        high: "1 per week - 1 per day"
        medium: "1 per month - 1 per week"
        low: "< 1 per month"
      collection_method: "Automated from CI/CD pipeline"

    lead_time_for_changes:
      description: "Time from code commit to production deployment"
      measurement: "Hours/days"
      target_values:
        elite: "< 1 hour"
        high: "1 day - 1 week"
        medium: "1 week - 1 month"
        low: "> 1 month"
      collection_method: "Git and deployment tool integration"

    mean_time_to_recovery:
      description: "Time to recover from production incidents"
      measurement: "Hours"
      target_values:
        elite: "< 1 hour"
        high: "< 1 day"
        medium: "1 day - 1 week"
        low: "> 1 week"
      collection_method: "Incident management systems"

    change_failure_rate:
      description: "Percentage of deployments causing production issues"
      measurement: "Percentage"
      target_values:
        elite: "0-15%"
        high: "16-30%"
        medium: "31-45%"
        low: "> 45%"
      collection_method: "Incident correlation with deployments"

  business_metrics:

    infrastructure_cost_efficiency:
      description: "Cost per unit of business value delivered"
      measurement: "Local currency per transaction/user/feature"
      target: "10% yearly improvement"
      collection_method: "Cloud billing API integration"

    developer_productivity:
      description: "Developer self-service capability"
      measurement: "Hours spent on infrastructure tasks per sprint"
      target: "< 20% of development time"
      collection_method: "Time tracking and developer surveys"

    compliance_adherence:
      description: "Adherence to regulatory requirements"
      measurement: "Compliance score (0-100%)"
      target: "> 95%"
      collection_method: "Automated compliance scanning"

    customer_satisfaction:
      description: "Internal customer (developer) satisfaction"
      measurement: "Net Promoter Score"
      target: "> 50"
      collection_method: "Quarterly developer surveys"

  cultural_metrics:

    psychological_safety:
      description: "Team member comfort with taking risks and admitting mistakes"
      measurement: "Survey score (1-5)"
      target: "> 4.0"
      collection_method: "Quarterly team health surveys"

    learning_culture:
      description: "Investment in learning and experimentation"
      measurement: "Hours per person per quarter"
      target: "> 40 hours"
      collection_method: "Learning management systems"

    collaboration_effectiveness:
      description: "Cross-functional team collaboration quality"
      measurement: "Survey score (1-5)"
      target: "> 4.0"
      collection_method: "360-degree feedback"

    innovation_rate:
      description: "Number of new ideas/experiments per quarter"
      measurement: "Count per team member"
      target: "> 2 per quarter"
      collection_method: "Innovation tracking systems"

  collection_automation:

    data_sources:
      - "GitHub/GitLab API for code metrics"
      - "Jenkins/GitLab CI for deployment metrics"
      - "PagerDuty/OpsGenie for incident metrics"
      - "AWS/Azure billing API for cost metrics"
      - "Survey tools for cultural metrics"

    dashboard_tools:
      - "Grafana for technical metrics visualisation"
      - "Tableau for business metrics analysis"
      - "Internal dashboards for team metrics"

    reporting_schedule:
      daily: ["Deployment frequency", "Incident count"]
      weekly: ["Lead time trends", "Cost analysis"]
      monthly: ["Team performance review", "Business value assessment"]
      quarterly: ["Cultural metrics", "Strategic review"]

  improvement_process:

    metric_review:
      frequency: "Monthly team retrospectives"
      participants: ["Team members", "Product owner", "Engineering manager"]
      outcome: "Improvement actions with owners and timelines"

    benchmarking:
      internal: "Compare teams within the organisation"
      industry: "Compare with DevOps industry standards"
      regional: "Compare with peer organisations"

    action_planning:
      identification: "Identify lowest-performing metrics"
      root_cause: "Analyse underlying causes"
      solutions: "Develop targeted improvement initiatives"
      tracking: "Monitor improvement progress monthly"

References and Navigation

Each code example in this appendix can be referenced from the main text using its unique identifier. To find specific implementations:

  1. Use search function - Search for code type or technology (e.g., "Terraform", "CloudFormation", "Python")
  2. Follow the categories - Navigate to the relevant section based on use case
  3. Use cross-references - Follow links back to the main chapters for context

Conventions for Code Examples

  • Comments: All code examples contain comments for clarity
  • Security: Security aspects are marked with 🔒
  • GDPR compliance: GDPR-related configurations are marked with 🇪🇺
  • Customisations: Local customisations are marked with 🇸🇪

Updates and Maintenance

This appendix is updated regularly when new code examples are added to the book's main chapters. For the latest version of code examples, see the book's GitHub repository.


For more information about specific implementations, see the respective main chapters where the code examples are introduced and explained in their context.

Chapter 14 Reference Implementations

The extended Terraform implementation that once appeared inline in Chapter 14 is preserved here so that readers can access the full listing without interrupting the main narrative. Cross-references in the chapter point to these appendix entries for quick navigation.

Note: Moving the code to Appendix A keeps Chapter 14 focused on adoption strategy while still providing the complete infrastructure example for practitioners.

14_CODE_1: Terraform service blueprint for a web application landing zone {#14_code_1}

Referenced from Chapter 14: Architecture as Code in Practice

This module demonstrates how to package core networking, load balancing, and tagging conventions into a reusable Terraform component that platform teams can roll out across environments.

# modules/web-application/main.tf
variable "environment" {
  description = "Environment name (dev, staging, prod)"
  type        = string
}

variable "application_name" {
  description = "Name of the application"
  type        = string
}

variable "instance_count" {
  description = "Number of application instances"
  type        = number
  default     = 2
}

# VPC and networking
resource "aws_vpc" "main" {
  cidr_block           = "10.0.0.0/16"
  enable_dns_hostnames = true
  enable_dns_support   = true

  tags = {
    Name        = "${var.application_name}-${var.environment}-vpc"
    Environment = var.environment
    Application = var.application_name
  }
}

resource "aws_subnet" "public" {
  count             = 2
  vpc_id            = aws_vpc.main.id
  cidr_block        = "10.0.${count.index + 1}.0/24"
  availability_zone = data.aws_availability_zones.available.names[count.index]

  map_public_ip_on_launch = true

  tags = {
    Name = "${var.application_name}-${var.environment}-public-${count.index + 1}"
    Type = "Public"
  }
}

# Application Load Balancer
resource "aws_lb" "main" {
  name               = "${var.application_name}-${var.environment}-alb"
  internal           = false
  load_balancer_type = "application"
  security_groups    = [aws_security_group.alb.id]
  subnets            = aws_subnet.public[*].id

  enable_deletion_protection = false

  tags = {
    Environment = var.environment
    Application = var.application_name
  }
}

# Auto Scaling Group
resource "aws_autoscaling_group" "main" {
  name                = "${var.application_name}-${var.environment}-asg"
  vpc_zone_identifier = aws_subnet.public[*].id
  target_group_arns   = [aws_lb_target_group.main.arn]
  health_check_type   = "ELB"
  health_check_grace_period = 300

  min_size         = 1
  max_size         = 10
  desired_capacity = var.instance_count

  launch_template {
    id      = aws_launch_template.main.id
    version = "$Latest"
  }

  tag {
    key                 = "Name"
    value               = "${var.application_name}-${var.environment}-instance"
    propagate_at_launch = true
  }

  tag {
    key                 = "Environment"
    value               = var.environment
    propagate_at_launch = true
  }
}

# Outputs
output "load_balancer_dns" {
  description = "DNS name of the load balancer"
  value       = aws_lb.main.dns_name
}

output "vpc_id" {
  description = "ID of the VPC"
  value       = aws_vpc.main.id
}

14_CODE_2: Environment configuration and observability controls {#14_code_2}

Referenced from Chapter 14: Architecture as Code in Practice

This configuration layers production-specific settings on top of the shared module, including state management, default tags, and a CloudWatch dashboard for operational oversight.

# environments/production/main.tf
terraform {
  required_version = ">= 1.0"

  backend "s3" {
    bucket         = "company-terraform-state-prod"
    key            = "web-application/terraform.tfstate"
    region         = "us-west-2"
    encrypt        = true
    dynamodb_table = "terraform-state-lock"
  }

  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 5.0"
    }
  }
}

provider "aws" {
  region = "us-west-2"

  default_tags {
    tags = {
      Project     = "web-application"
      Environment = "production"
      ManagedBy   = "terraform"
      Owner       = "platform-team"
    }
  }
}

module "web_application" {
  source = "../../modules/web-application"

  environment      = "production"
  application_name = "company-web-app"
  instance_count   = 6

  # Production-specific overrides
  enable_monitoring = true
  backup_retention  = 30
  multi_az          = true
}

# Production-specific resources
resource "aws_cloudwatch_dashboard" "main" {
  dashboard_name = "WebApplication-Production"

  dashboard_body = jsonencode({
    widgets = [
      {
        type   = "metric"
        x      = 0
        y      = 0
        width  = 12
        height = 6

        properties = {
          metrics = [
            ["AWS/ApplicationELB", "RequestCount", "LoadBalancer", module.web_application.load_balancer_arn_suffix],
            [".", "TargetResponseTime", ".", "."],
            [".", "HTTPCode_ELB_5XX_Count", ".", "."]
          ]
          view    = "timeSeries"
          stacked = false
          region  = "us-west-2"
          title   = "Application Performance"
          period  = 300
        }
      }
    ]
  })
}

14_CODE_3: Continuous delivery workflow for infrastructure changes {#14_code_3}

Referenced from Chapter 14: Architecture as Code in Practice

The workflow below demonstrates how to plan and apply Terraform changes across development, staging, and production with explicit separation between planning and deployment stages.

# .github/workflows/infrastructure.yml
name: Infrastructure Deployment

on:
  push:
    branches: [main]
    paths: ['infrastructure/**']
  pull_request:
    branches: [main]
    paths: ['infrastructure/**']

env:
  TF_VERSION: 1.5.0
  AWS_REGION: us-west-2

jobs:
  plan:
    name: Terraform Plan
    runs-on: ubuntu-latest
    strategy:
      matrix:
        environment: [development, staging, production]

    steps:
    - name: Checkout code
      uses: actions/checkout@v3

    - name: Setup Terraform
      uses: hashicorp/setup-terraform@v2
      with:
        terraform_version: ${{ env.TF_VERSION }}

    - name: Configure AWS credentials
      uses: aws-actions/configure-aws-credentials@v2
      with:
        aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
        aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
        aws-region: ${{ env.AWS_REGION }}

    - name: Terraform Init
      working-directory: infrastructure/environments/${{ matrix.environment }}
      run: terraform init

    - name: Terraform Validate
      working-directory: infrastructure/environments/${{ matrix.environment }}
      run: terraform validate

    - name: Terraform Plan
      working-directory: infrastructure/environments/${{ matrix.environment }}
      run: |
        terraform plan -out=tfplan-${{ matrix.environment }} \
          -var-file="terraform.tfvars"

    - name: Upload plan artifact
      uses: actions/upload-artifact@v3
      with:
        name: tfplan-${{ matrix.environment }}
        path: infrastructure/environments/${{ matrix.environment }}/tfplan-${{ matrix.environment }}
        retention-days: 30

  deploy:
    name: Terraform Apply
    runs-on: ubuntu-latest
    needs: plan
    if: github.ref == 'refs/heads/main'
    strategy:
      matrix:
        environment: [development, staging]
        # Production requires manual approval

    environment: ${{ matrix.environment }}

    steps:
    - name: Checkout code
      uses: actions/checkout@v3

    - name: Setup Terraform
      uses: hashicorp/setup-terraform@v2
      with:
        terraform_version: ${{ env.TF_VERSION }}

    - name: Configure AWS credentials
      uses: aws-actions/configure-aws-credentials@v2
      with:
        aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
        aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
        aws-region: ${{ env.AWS_REGION }}

    - name: Download plan artifact
      uses: actions/download-artifact@v3
      with:
        name: tfplan-${{ matrix.environment }}
        path: infrastructure/environments/${{ matrix.environment }}

    - name: Terraform Init
      working-directory: infrastructure/environments/${{ matrix.environment }}
      run: terraform init

    - name: Terraform Apply
      working-directory: infrastructure/environments/${{ matrix.environment }}
      run: terraform apply tfplan-${{ matrix.environment }}

  production-deploy:
    name: Production Deployment
    runs-on: ubuntu-latest
    needs: [plan, deploy]
    if: github.ref == 'refs/heads/main'
    environment:
      name: production
      url: ${{ steps.deploy.outputs.application_url }}

    steps:
    - name: Manual approval checkpoint
      run: echo "Production deployment requires manual approval"

    # Similar steps as deploy job but for production environment

Chapter 15 Reference Implementations

15_CODE_1: Cost-aware Terraform infrastructure configuration {#15_code_1}

Referenced from Chapter 15: Cost Optimisation and Resource Management

This Terraform configuration demonstrates comprehensive cost optimisation strategies including budget management, cost allocation tagging, and intelligent instance type selection using spot instances and mixed instance policies.

# cost_optimized_infrastructure.tf
terraform {
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 5.0"
    }
  }
}

# Cost allocation tags for all infrastructure
locals {
  cost_tags = {
    CostCentre     = var.cost_centre
    Project        = var.project_name
    Environment    = var.environment
    Owner          = var.team_email
    BudgetAlert    = var.budget_threshold
    ReviewDate     = formatdate("YYYY-MM-DD", timeadd(timestamp(), "30*24h"))
    Region         = var.aws_region
  }
}

# Budget with automatic alerts (EUR equivalent tracked separately for EU reporting)
resource "aws_budgets_budget" "project_budget" {
  name         = "${var.project_name}-budget"
  budget_type  = "COST"
  limit_amount = var.monthly_budget_limit_usd  # AWS bills in USD
  limit_unit   = "USD"
  time_unit    = "MONTHLY"

  cost_filters = {
    Tag = {
      Project = [var.project_name]
    }
    Region = [var.aws_region]  # Filter by EU region
  }

  notification {
    comparison_operator        = "GREATER_THAN"
    threshold                 = 80
    threshold_type            = "PERCENTAGE"
    notification_type         = "ACTUAL"
    subscriber_email_addresses = [var.team_email, var.finance_email]
  }

  notification {
    comparison_operator        = "GREATER_THAN"  
    threshold                 = 100
    threshold_type            = "PERCENTAGE"
    notification_type          = "FORECASTED"
    subscriber_email_addresses = [var.team_email, var.finance_email]
  }
}

# Cost-optimised EC2 with Spot instances
resource "aws_launch_template" "cost_optimized" {
  name_prefix   = "${var.project_name}-cost-opt-"
  image_id      = data.aws_ami.amazon_linux.id

  # Mixed instance types for cost optimisation
  instance_requirements {
    memory_mib {
      min = 2048
      max = 8192
    }
    vcpu_count {
      min = 1
      max = 4
    }
    instance_generations = ["current"]
  }

  # Spot instance preference for cost optimisation
  instance_market_options {
    market_type = "spot"
    spot_options {
      max_price = var.max_spot_price
    }
  }

  tag_specifications {
    resource_type = "instance"
    tags = local.cost_tags
  }
}

# Auto Scaling with cost considerations
resource "aws_autoscaling_group" "cost_aware" {
  name                = "${var.project_name}-cost-aware-asg"
  vpc_zone_identifier = var.private_subnet_ids
  min_size            = var.min_instances
  max_size            = var.max_instances
  desired_capacity    = var.desired_instances

  # Mixed instance type strategy for cost optimisation
  mixed_instances_policy {
    instances_distribution {
      on_demand_base_capacity                  = 1
      on_demand_percentage_above_base_capacity = 20
      spot_allocation_strategy                 = "diversified"
    }

    launch_template {
      launch_template_specification {
        launch_template_id = aws_launch_template.cost_optimized.id
        version            = "$Latest"
      }
    }
  }

  tag {
    key                 = "Name"
    value               = "${var.project_name}-cost-optimized"
    propagate_at_launch = true
  }

  dynamic "tag" {
    for_each = local.cost_tags
    content {
      key                 = tag.key
      value               = tag.value
      propagate_at_launch = true
    }
  }
}

15_CODE_2: Kubernetes cost optimisation manifests {#15_code_2}

Referenced from Chapter 15: Cost Optimisation and Resource Management

These Kubernetes manifests demonstrate resource quotas, limit ranges, and autoscaling configurations for cost-effective workload management.

# kubernetes/cost-optimization-quota.yaml
apiVersion: v1
kind: ResourceQuota
metadata:
  name: cost-control-quota
  namespace: production
spec:
  hard:
    requests.cpu: "20"
    requests.memory: 40Gi
    limits.cpu: "40"
    limits.memory: 80Gi
    persistentvolumeclaims: "10"
    count/pods: "50"
    count/services: "10"
---
# kubernetes/cost-optimization-limits.yaml
apiVersion: v1
kind: LimitRange
metadata:
  name: cost-control-limits
  namespace: production
spec:
  limits:
  - default:
      cpu: "500m"
      memory: "1Gi"
    defaultRequest:
      cpu: "100m"
      memory: "256Mi"
    max:
      cpu: "2"
      memory: "4Gi"
    min:
      cpu: "50m"
      memory: "128Mi"
    type: Container
---
# kubernetes/vertical-pod-autoscaler.yaml
apiVersion: autoscaling.k8s.io/v1
kind: VerticalPodAutoscaler
metadata:
  name: cost-optimized-vpa
  namespace: production
spec:
  targetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: web-application
  updatePolicy:
    updateMode: "Auto"
  resourcePolicy:
    containerPolicies:
    - containerName: app
      maxAllowed:
        cpu: "1"
        memory: "2Gi"
      minAllowed:
        cpu: "100m"
        memory: "256Mi"
---
# kubernetes/horizontal-pod-autoscaler.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: cost-aware-hpa
  namespace: production
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: web-application
  minReplicas: 2
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80
  behavior:
    scaleDown:
      stabilizationWindowSeconds: 300
      policies:
      - type: Percent
        value: 50
        periodSeconds: 60
    scaleUp:
      stabilizationWindowSeconds: 60
      policies:
      - type: Percent
        value: 100
        periodSeconds: 60

15_CODE_3: AWS cost monitoring and optimisation automation {#15_code_3}

Referenced from Chapter 15: Cost Optimisation and Resource Management

This Python script provides automated cost analysis, rightsizing recommendations, and identification of unused resources for AWS environments.

# cost_monitoring/cost_optimizer.py
import boto3
import json
from datetime import datetime, timedelta
from typing import Dict, List
import pandas as pd

class AWSCostOptimizer:
    """
    Automated cost optimisation for AWS resources across EU regions
    """

    def __init__(self, region='eu-west-1', eu_regions=None):
        """
        Initialise cost optimiser for EU regions.

        Args:
            region: Primary AWS region (defaults to eu-west-1, Ireland)
            eu_regions: List of EU regions to analyse (defaults to major EU regions)
        """
        if eu_regions is None:
            # Default EU regions for multi-region cost analysis
            eu_regions = ['eu-west-1', 'eu-central-1', 'eu-west-2', 'eu-west-3', 'eu-south-1', 'eu-north-1']

        self.region = region
        self.eu_regions = eu_regions
        self.cost_explorer = boto3.client('ce', region_name=region)
        self.ec2 = boto3.client('ec2', region_name=region)
        self.rds = boto3.client('rds', region_name=region)
        self.cloudwatch = boto3.client('cloudwatch', region_name=region)

    def analyze_cost_trends(self, days_back=30) -> Dict:
        """Analyse cost trends for the last period across EU regions"""

        end_date = datetime.now().date()
        start_date = end_date - timedelta(days=days_back)

        response = self.cost_explorer.get_cost_and_usage(
            TimePeriod={
                'Start': start_date.strftime('%Y-%m-%d'),
                'End': end_date.strftime('%Y-%m-%d')
            },
            Granularity='DAILY',
            Metrics=['BlendedCost'],
            GroupBy=[
                {'Type': 'DIMENSION', 'Key': 'SERVICE'},
                {'Type': 'TAG', 'Key': 'Project'},
                {'Type': 'DIMENSION', 'Key': 'REGION'}
            ],
            Filter={
                'Dimensions': {
                    'Key': 'REGION',
                    'Values': self.eu_regions
                }
            }
        )

        return self._process_cost_data(response)

    def identify_rightsizing_opportunities(self) -> List[Dict]:
        """Identify EC2 instances that can be rightsized across EU regions"""

        rightsizing_response = self.cost_explorer.get_rightsizing_recommendation(
            Service='AmazonEC2',
            Configuration={
                'BenefitsConsidered': True,
                'RecommendationTarget': 'SAME_INSTANCE_FAMILY'
            },
            Filter={
                'Dimensions': {
                    'Key': 'REGION',
                    'Values': self.eu_regions
                }
            }
        )

        opportunities = []

        for recommendation in rightsizing_response.get('RightsizingRecommendations', []):
            if recommendation['RightsizingType'] == 'Modify':
                opportunities.append({
                    'instance_id': recommendation['CurrentInstance']['ResourceId'],
                    'current_type': recommendation['CurrentInstance']['InstanceName'],
                    'recommended_type': recommendation['ModifyRecommendationDetail']['TargetInstances'][0]['InstanceName'],
                    'estimated_monthly_savings_usd': float(recommendation['ModifyRecommendationDetail']['TargetInstances'][0]['EstimatedMonthlySavings']),
                    'utilisation': recommendation['CurrentInstance']['UtilizationMetrics'],
                    'region': recommendation['CurrentInstance'].get('Region', 'unknown')
                })

        return opportunities

    def get_unused_resources(self) -> Dict:
        """Identify unused resources that can be terminated across EU regions"""

        unused_resources = {
            'unattached_volumes': self._find_unattached_ebs_volumes(),
            'unused_elastic_ips': self._find_unused_elastic_ips(),
            'idle_load_balancers': self._find_idle_load_balancers(),
            'stopped_instances': self._find_stopped_instances()
        }

        return unused_resources

    def generate_cost_optimization_plan(self, project_tag: str) -> Dict:
        """Generate comprehensive cost optimisation plan for EU regions"""

        plan = {
            'project': project_tag,
            'analysis_date': datetime.now().isoformat(),
            'eu_regions_analysed': self.eu_regions,
            'current_monthly_cost_usd': self._get_current_monthly_cost(project_tag),
            'recommendations': {
                'rightsizing': self.identify_rightsizing_opportunities(),
                'unused_resources': self.get_unused_resources(),
                'reserved_instances': self._analyze_reserved_instance_opportunities(),
                'spot_instances': self._analyze_spot_instance_opportunities()
            },
            'potential_monthly_savings_usd': 0,
            'notes': 'Costs are in USD (AWS billing currency). Convert to EUR using current exchange rate for EU financial reporting.'
        }

        # Calculate total potential savings
        total_savings = 0
        for rec_type, recommendations in plan['recommendations'].items():
            if isinstance(recommendations, list):
                total_savings += sum(rec.get('estimated_monthly_savings_usd', 0) for rec in recommendations)
            elif isinstance(recommendations, dict):
                total_savings += recommendations.get('estimated_monthly_savings_usd', 0)

        plan['potential_monthly_savings_usd'] = total_savings
        plan['savings_percentage'] = (total_savings / plan['current_monthly_cost_usd']) * 100 if plan['current_monthly_cost_usd'] > 0 else 0

        return plan

    def _find_unattached_ebs_volumes(self) -> List[Dict]:
        """Find unattached EBS volumes across EU regions"""

        unattached_volumes = []

        for region in self.eu_regions:
            ec2_regional = boto3.client('ec2', region_name=region)
            response = ec2_regional.describe_volumes(
                Filters=[{'Name': 'status', 'Values': ['available']}]
            )

            for volume in response['Volumes']:
                # Calculate monthly cost based on volume size and type
                monthly_cost = self._calculate_ebs_monthly_cost(volume, region)

                unattached_volumes.append({
                    'volume_id': volume['VolumeId'],
                    'size_gb': volume['Size'],
                    'volume_type': volume['VolumeType'],
                    'region': region,
                    'estimated_monthly_savings_usd': monthly_cost,
                    'creation_date': volume['CreateTime'].isoformat()
                })

        return unattached_volumes

    def _calculate_ebs_monthly_cost(self, volume: Dict, region: str) -> float:
        """Calculate monthly cost for EBS volume in specific EU region"""

        # Example pricing for EU regions (USD per GB/month)
        # Prices vary slightly by region - these are representative values
        regional_pricing = {
            'eu-west-1': {  # Ireland
                'gp3': 0.088,
                'gp2': 0.110,
                'io1': 0.138,
                'io2': 0.138,
                'st1': 0.048,
                'sc1': 0.026
            },
            'eu-central-1': {  # Frankfurt
                'gp3': 0.095,
                'gp2': 0.119,
                'io1': 0.149,
                'io2': 0.149,
                'st1': 0.052,
                'sc1': 0.028
            }
        }

        # Default to eu-west-1 pricing if region not found
        pricing = regional_pricing.get(region, regional_pricing['eu-west-1'])
        cost_per_gb = pricing.get(volume['VolumeType'], 0.110)  # Default to gp2
        return volume['Size'] * cost_per_gb

def generate_terraform_cost_optimizations(cost_plan: Dict) -> str:
    """Generate Terraform code to implement cost optimisations"""

    terraform_code = """
# Automatically generated cost optimisations
# Generated: {date}
# Project: {project}
# EU Regions: {regions}
# Potential monthly savings: ${savings:.2f} USD
# Note: Convert to EUR using current exchange rate for financial reporting

""".format(
        date=datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
        project=cost_plan['project'],
        regions=', '.join(cost_plan.get('eu_regions_analysed', [])),
        savings=cost_plan['potential_monthly_savings_usd']
    )

    # Generate spot instance configurations
    if cost_plan['recommendations']['spot_instances']:
        terraform_code += """
# Spot Instance Configuration for cost optimisation
resource "aws_launch_template" "spot_optimized" {{
  name_prefix   = "{project}-spot-"

  instance_market_options {
    market_type = "spot"
    spot_options {{
      max_price = "{max_spot_price}"
    }}
  }}

  # Cost allocation tags
  tag_specifications {{
    resource_type = "instance"
    tags = {{
      Project = "{project}"
      CostOptimisation = "spot-instance"
      EstimatedSavings = "${estimated_savings}"
    }}
  }}
}}
""".format(
            project=cost_plan['project'],
            max_spot_price=cost_plan['recommendations']['spot_instances'].get('recommended_max_price', '0.10'),
            estimated_savings=cost_plan['recommendations']['spot_instances'].get('estimated_monthly_savings_usd', 0)
        )

    return terraform_code

Security and compliance {#security-compliance}

10_CODE_1: Advanced Policy-as-Code module for EU compliance {#10_code_1}

Referenced from Chapter 10: Policy and Security as Code in Detail. This Rego module consolidates encryption validation, network segmentation checks inspired by MSB guidance, and GDPR Article 44 data residency controls. It generates a composite compliance score so teams can fail builds or raise alerts when thresholds are breached.

package se.enterprise.security

import rego.v1

encryption_required_services := {
    "aws_s3_bucket",
    "aws_rds_instance",
    "aws_rds_cluster",
    "aws_efs_file_system",
    "aws_dynamodb_table",
    "aws_redshift_cluster",
    "aws_elasticsearch_domain"
}

administrative_ports := {22, 3389, 5432, 3306, 1433, 27017, 6379, 9200, 5601}
allowed_public_ports := {80, 443}
eu_regions := {"eu-north-1", "eu-west-1", "eu-west-2", "eu-west-3", "eu-central-1", "eu-south-1"}

encryption_compliant[resource] {
    resource := input.resources[_]
    resource.type in encryption_required_services
    encryption := get_encryption_status(resource)
    validation := validate_encryption_strength(encryption)
    validation.compliant
}

get_encryption_status(resource) := result {
    resource.type == "aws_s3_bucket"
    result := {
        "at_rest": has_s3_encryption(resource),
        "in_transit": has_s3_ssl_policy(resource),
        "key_management": resource.attributes.kms_key_type
    }
}

get_encryption_status(resource) := result {
    resource.type == "aws_rds_instance"
    result := {
        "at_rest": resource.attributes.storage_encrypted,
        "in_transit": resource.attributes.force_ssl,
        "key_management": resource.attributes.kms_key_type
    }
}

validate_encryption_strength(encryption) := result {
    encryption.at_rest
    encryption.in_transit
    key_validation := validate_key_management(encryption.key_management)
    result := {
        "compliant": key_validation.approved,
        "strength": key_validation.strength,
        "recommendations": key_validation.recommendations
    }
}

validate_key_management("customer_managed") := {
    "approved": true,
    "strength": "high",
    "recommendations": []
}

validate_key_management("aws_managed") := {
    "approved": true,
    "strength": "medium",
    "recommendations": [
        "Consider migrating to customer managed keys for greater control",
        "Enable automatic key rotation"
    ]
}

validate_key_management(_) := {
    "approved": false,
    "strength": "low",
    "recommendations": [
        "Configure an approved KMS key",
        "Document ownership within the OSCAL profile"
    ]
}

network_security_violations[violation] {
    resource := input.resources[_]
    resource.type == "aws_security_group"
    violation := check_ingress_rules(resource)
    violation.severity in ["critical", "high"]
}

check_ingress_rules(sg) := violation {
    rule := sg.attributes.ingress[_]
    rule.cidr_blocks[_] == "0.0.0.0/0"
    rule.from_port in administrative_ports
    violation := {
        "type": "critical_port_exposure",
        "severity": "critical",
        "port": rule.from_port,
        "security_group": sg.attributes.name,
        "message": sprintf("Administrative port %v is exposed to the internet", [rule.from_port]),
        "remediation": "Restrict access to dedicated management networks",
        "reference": "MSB 3.2.1 Network Segmentation"
    }
}

check_ingress_rules(sg) := violation {
    rule := sg.attributes.ingress[_]
    rule.cidr_blocks[_] == "0.0.0.0/0"
    not rule.from_port in allowed_public_ports
    not rule.from_port in administrative_ports
    violation := {
        "type": "non_standard_port_exposure",
        "severity": "high",
        "port": rule.from_port,
        "security_group": sg.attributes.name,
        "message": sprintf("Non-standard port %v is exposed to the internet", [rule.from_port]),
        "remediation": "Validate the business requirement and narrow the CIDR range",
        "reference": "MSB 3.2.2 Minimal Exposure"
    }
}

data_sovereignty_compliant[resource] {
    resource := input.resources[_]
    resource.type in {
        "aws_s3_bucket", "aws_rds_instance", "aws_rds_cluster",
        "aws_dynamodb_table", "aws_elasticsearch_domain",
        "aws_redshift_cluster", "aws_efs_file_system"
    }
    classification := determine_classification(resource)
    result := validate_region(resource, classification)
    result.compliant
}

determine_classification(resource) := classification {
    classification := resource.attributes.tags["DataClassification"]
    classification != ""
}

determine_classification(resource) := "personal" {
    contains(lower(resource.attributes.name), "personal")
}

determine_classification(resource) := "personal" {
    resource.type in ["aws_rds_instance", "aws_rds_cluster"]
    indicators := {"user", "customer", "gdpr", "pii"}
    some indicator in indicators
    contains(lower(resource.attributes.identifier), indicator)
}

determine_classification(_) := "internal"

validate_region(resource, "personal") := {
    "compliant": get_resource_region(resource) in eu_regions,
    "requirement": "GDPR Articles 44–49"
}

validate_region(resource, _) := {
    "compliant": true,
    "requirement": "Internal data — EU residency not mandatory"
}

get_resource_region(resource) := region {
    region := resource.attributes.region
    region != ""
}

get_resource_region(resource) := region {
    az := resource.attributes.availability_zone
    region := substring(az, 0, count(az) - 1)
}

get_resource_region(_) := "unknown"

compliance_assessment := result {
    encryption_violations := [
        create_encryption_violation(resource) |
        resource := input.resources[_];
        resource.type in encryption_required_services;
        not encryption_compliant[resource]
    ]

    network_violations := [v | v := network_security_violations[_]]

    sovereignty_violations := [
        create_sovereignty_violation(resource) |
        resource := input.resources[_];
        resource.type in encryption_required_services;
        not data_sovereignty_compliant[resource]
    ]

    violations := array.concat(array.concat(encryption_violations, network_violations), sovereignty_violations)

    result := {
        "overall_score": calculate_score(violations),
        "violations": violations,
        "regulators": {
            "gdpr": assess_regulator("GDPR", violations),
            "msb": assess_regulator("MSB", violations),
            "iso27001": assess_regulator("ISO 27001", violations)
        }
    }
}

create_encryption_violation(resource) := {
    "type": "encryption_required",
    "severity": "critical",
    "resource": resource.type,
    "message": "Mandatory encryption is disabled",
    "remediation": "Enable encryption at rest and enforce TLS in transit",
    "reference": "GDPR Article 32"
}

create_sovereignty_violation(resource) := {
    "type": "data_sovereignty",
    "severity": "critical",
    "resource": resource.type,
    "message": sprintf("Personal data stored outside approved EU regions (%v)", [get_resource_region(resource)]),
    "remediation": "Move the workload to an EU region or document an adequacy decision",
    "reference": "GDPR Articles 44–49"
}

calculate_score(violations) := score {
    penalties := [penalty |
        violation := violations[_];
        penalty := severity_penalties[violation.severity]
    ]
    score := math.max(0, 100 - sum(penalties))
}

severity_penalties := {
    "critical": 25,
    "high": 15,
    "medium": 10,
    "low": 5
}

assess_regulator(name, violations) := {
    "name": name,
    "open_findings": count([v | v := violations[_]; contains(lower(v.reference), lower(name))])
}

10_CODE_2: OSCAL profile for regulated financial services {#10_code_2}

Referenced from Chapter 10. This OSCAL profile merges controls from NIST SP 800-53 with GDPR Article 32 and MSB network segmentation expectations. Parameters clarify the encryption standard and key management practices adopted by the organisation.

{
  "profile": {
    "uuid": "87654321-4321-8765-4321-876543218765",
    "metadata": {
      "title": "Financial Institutions Security Profile",
      "published": "2024-01-15T11:00:00Z",
      "last-modified": "2024-01-15T11:00:00Z",
      "version": "2.1",
      "oscal-version": "1.1.2",
      "props": [
        { "name": "organization", "value": "Financial Sector" },
        { "name": "jurisdiction", "value": "EU" }
      ]
    },
    "imports": [
      {
        "href": "https://raw.githubusercontent.com/usnistgov/oscal-content/main/nist.gov/SP800-53/rev5/json/NIST_SP-800-53_rev5_catalog.json",
        "include-controls": [
          { "matching": [ { "pattern": "ac-.*" }, { "pattern": "au-.*" }, { "pattern": "sc-.*" } ] }
        ]
      },
      {
        "href": "regional-catalog.json",
        "include-controls": [
          { "matching": [ { "pattern": "gdpr-.*" }, { "pattern": "msb-.*" } ] }
        ]
      }
    ],
    "merge": {
      "combine": { "method": "merge" }
    },
    "modify": {
      "set-parameters": [
        {
          "param-id": "gdpr-art32-1_prm1",
          "values": ["AES-256-GCM"]
        },
        {
          "param-id": "gdpr-art32-1_prm2",
          "values": ["AWS KMS customer managed keys backed by HSM"]
        },
        {
          "param-id": "msb-3.2.1_prm1",
          "values": ["Zero Trust segmentation enforced via AWS Network Firewall"]
        }
      ],
      "alters": [
        {
          "control-id": "gdpr-art32-1",
          "adds": [
            {
              "position": "after",
              "by-id": "gdpr-art32-1_gdn",
              "parts": [
                {
                  "id": "gdpr-art32-1_fin-guidance",
                  "name": "guidance",
                  "title": "Finansinspektionen Supplement",
                  "prose": "Payment service providers must use FIPS 140-2 validated encryption modules and review key material every 90 days."
                }
              ]
            }
          ]
        },
        {
          "control-id": "msb-3.2.1",
          "adds": [
            {
              "position": "after",
              "by-id": "msb-3.2.1_gdn",
              "parts": [
                {
                  "id": "msb-3.2.1_fin-requirement",
                  "name": "requirement",
                  "title": "Financial Sector Isolation",
                  "prose": "Critical payment workloads must be isolated in dedicated network segments with inspection by AWS Network Firewall and VPC Traffic Mirroring."
                }
              ]
            }
          ]
        }
      ]
    }
  }
}

10_CODE_3: OSCAL component definitions for reusable cloud modules {#10_code_3}

Referenced from Chapter 10. Component definitions document how Terraform modules satisfy regulatory expectations. This example captures Amazon RDS, Amazon S3, and AWS Network Firewall implementations used throughout the financial profile.

{
  "component-definition": {
    "uuid": "11223344-5566-7788-99aa-bbccddeeff00",
    "metadata": {
      "title": "AWS Components for Regulated Workloads",
      "published": "2024-01-15T12:00:00Z",
      "last-modified": "2024-01-15T12:00:00Z",
      "version": "1.5",
      "oscal-version": "1.1.2"
    },
    "components": [
      {
        "uuid": "comp-aws-rds-mysql",
        "type": "software",
        "title": "Amazon RDS MySQL",
        "description": "Managed relational database configured for GDPR Article 32 compliance.",
        "control-implementations": [
          {
            "source": "regional-catalog.json",
            "implemented-requirements": [
              {
                "control-id": "gdpr-art32-1.1",
                "description": "Encryption at rest enabled with customer managed KMS keys.",
                "statements": [
                  {
                    "statement-id": "gdpr-art32-1.1_smt",
                    "description": "Default encryption uses AES-256 with keys rotated every 365 days.",
                    "implementation-status": { "state": "implemented" }
                  }
                ],
                "props": [
                  { "name": "kms-key-type", "value": "customer-managed" },
                  { "name": "rotation", "value": "365 days" }
                ]
              },
              {
                "control-id": "msb-3.2.1.1",
                "description": "Database subnet groups isolated from public subnets.",
                "statements": [
                  {
                    "statement-id": "msb-3.2.1.1_smt",
                    "description": "Only application load balancers within the VPC may initiate connections.",
                    "implementation-status": { "state": "implemented" }
                  }
                ]
              }
            ]
          }
        ]
      },
      {
        "uuid": "comp-aws-s3",
        "type": "software",
        "title": "Amazon S3 Secure Bucket",
        "description": "Object storage with automatic encryption and access logging.",
        "control-implementations": [
          {
            "source": "regional-catalog.json",
            "implemented-requirements": [
              {
                "control-id": "gdpr-art32-1.2",
                "description": "Enforced TLS 1.2 for all data in transit.",
                "statements": [
                  {
                    "statement-id": "gdpr-art32-1.2_smt",
                    "description": "S3 bucket policies block non-TLS requests and log denials.",
                    "implementation-status": { "state": "implemented" }
                  }
                ]
              },
              {
                "control-id": "msb-3.2.1.2",
                "description": "Zero Trust verification for access using IAM conditions.",
                "statements": [
                  {
                    "statement-id": "msb-3.2.1.2_smt",
                    "description": "IAM policies require device posture attributes for privileged access.",
                    "implementation-status": { "state": "planned" }
                  }
                ]
              }
            ]
          }
        ]
      },
      {
        "uuid": "comp-aws-network-firewall",
        "type": "software",
        "title": "AWS Network Firewall",
        "description": "Edge inspection enforcing MSB segmentation and logging requirements.",
        "control-implementations": [
          {
            "source": "regional-catalog.json",
            "implemented-requirements": [
              {
                "control-id": "msb-3.2.1",
                "description": "Micro-segmentation between payment and support zones.",
                "statements": [
                  {
                    "statement-id": "msb-3.2.1_smt",
                    "description": "Stateful rules restrict lateral movement and mirror traffic to a central collector.",
                    "implementation-status": { "state": "implemented" }
                  }
                ]
              }
            ]
          }
        ]
      }
    ]
  }
}

10_CODE_4: Automated OSCAL System Security Plan generator {#10_code_4}

Referenced from Chapter 10. This Python utility ingests Terraform state, merges it with component definitions, and produces a machine-readable OSCAL SSP. The script is designed to run inside CI so every deployment updates the compliance evidence set.

"""Generate OSCAL System Security Plans from Terraform configurations."""
from __future__ import annotations

import json
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Iterable, List

import boto3
import hcl2


@dataclass
class ComponentDefinition:
    """Representation of an OSCAL component definition file."""

    path: Path
    content: Dict[str, Any]


class OSCALSSPGenerator:
    """Build an OSCAL-compliant System Security Plan from source files."""

    def __init__(self, terraform_directory: Path, component_paths: Iterable[Path]):
        self.terraform_directory = terraform_directory
        self.component_paths = list(component_paths)
        self.sts = boto3.client("sts")

    def generate(self, profile_href: str, system_name: str) -> Dict[str, Any]:
        resources = self._parse_terraform()
        components = self._load_components()
        mappings = self._map_resources_to_components(resources, components)
        implementations = self._build_control_implementations(mappings)

        now = datetime.utcnow().isoformat() + "Z"
        return {
            "system-security-plan": {
                "uuid": self._uuid(),
                "metadata": {
                    "title": f"System Security Plan – {system_name}",
                    "published": now,
                    "last-modified": now,
                    "version": "1.0",
                    "oscal-version": "1.1.2",
                    "props": [
                        {"name": "organization", "value": "Enterprise"},
                        {"name": "system-name", "value": system_name}
                    ]
                },
                "import-profile": {"href": profile_href},
                "system-characteristics": {
                    "system-ids": [
                        {
                            "identifier-type": "https://ietf.org/rfc/rfc4122",
                            "id": self._account_id()
                        }
                    ],
                    "system-name": system_name,
                    "description": f"Automated SSP generated from Terraform for {system_name}",
                    "security-sensitivity-level": "moderate"
                },
                "control-implementation": {
                    "implemented-requirements": implementations
                }
            }
        }

    def _parse_terraform(self) -> List[Dict[str, Any]]:
        resources: List[Dict[str, Any]] = []
        for tf_file in self.terraform_directory.rglob("*.tf"):
            with tf_file.open("r", encoding="utf-8") as handle:
                data = hcl2.load(handle)
                resources.extend(data.get("resource", []))
        return resources

    def _load_components(self) -> List[ComponentDefinition]:
        components: List[ComponentDefinition] = []
        for path in self.component_paths:
            with path.open("r", encoding="utf-8") as handle:
                components.append(ComponentDefinition(path, json.load(handle)))
        return components

    def _map_resources_to_components(
        self,
        resources: List[Dict[str, Any]],
        components: List[ComponentDefinition],
    ) -> Dict[str, ComponentDefinition]:
        mappings: Dict[str, ComponentDefinition] = {}
        for resource in resources:
            resource_type = next(iter(resource.keys()))
            for component in components:
                if resource_type in json.dumps(component.content):
                    mappings[resource_type] = component
        return mappings

    def _build_control_implementations(
        self, mappings: Dict[str, ComponentDefinition]
    ) -> List[Dict[str, Any]]:
        implementations: List[Dict[str, Any]] = []
        for component in mappings.values():
            definition = component.content["component-definition"]
            for comp in definition.get("components", []):
                implementations.extend(
                    comp.get("control-implementations", [])
                )
        return implementations

    def _uuid(self) -> str:
        return datetime.utcnow().strftime("ssp-%Y%m%d%H%M%S")

    def _account_id(self) -> str:
        return self.sts.get_caller_identity()["Account"]


def load_component_paths(directory: Path) -> List[Path]:
    return sorted(directory.glob("*.json"))


def save_ssp(output_path: Path, ssp: Dict[str, Any]) -> None:
    output_path.write_text(json.dumps(ssp, indent=2), encoding="utf-8")


if __name__ == "__main__":
    tf_dir = Path("infrastructure")
    components_dir = Path("oscal/components")
    generator = OSCALSSPGenerator(tf_dir, load_component_paths(components_dir))
    plan = generator.generate(
        profile_href="profiles/financial-profile.json",
        system_name="Payments Platform"
    )
    save_ssp(Path("artifacts/system-security-plan.json"), plan)