Skip to main content

Lambda function and Role

Lambda execution role

# ===================
# Lambda Execution Role
# ===================
lambda_role = iam.Role(
self, "LambdaRole",
assumed_by=iam.ServicePrincipal("lambda.amazonaws.com"),
managed_policies=[
iam.ManagedPolicy.from_aws_managed_policy_name(
"service-role/AWSLambdaBasicExecutionRole"
)
]
)
lambda_role.add_to_policy(
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=[
"personalize:*",
"iam:PassRole"
],
resources=["*"]
)
)

Lambda functions for State Machine

# ===================
# Lambda Functions
# ===================

# Create Dataset Import Job
create_import_job_fn = lambda_.Function(
self, "CreateImportJobFunction",
runtime=lambda_.Runtime.PYTHON_3_12,
handler="index.handler",
code=lambda_.Code.from_inline(self._get_import_job_code()),
role=lambda_role,
timeout=Duration.minutes(5),
environment={
"DATASET_ARN": dataset_arn,
"ROLE_ARN": personalize_role.role_arn,
"BUCKET_NAME": data_bucket.bucket_name
}
)

# Check Import Job Status
check_import_status_fn = lambda_.Function(
self, "CheckImportStatusFunction",
runtime=lambda_.Runtime.PYTHON_3_12,
handler="index.handler",
code=lambda_.Code.from_inline(self._get_check_import_status_code()),
role=lambda_role,
timeout=Duration.minutes(1)
)

# Create Solution
create_solution_fn = lambda_.Function(
self, "CreateSolutionFunction",
runtime=lambda_.Runtime.PYTHON_3_12,
handler="index.handler",
code=lambda_.Code.from_inline(self._get_create_solution_code()),
role=lambda_role,
timeout=Duration.minutes(5),
environment={
"DATASET_GROUP_ARN": dataset_group_arn
}
)

# Check Solution Status
check_solution_status_fn = lambda_.Function(
self, "CheckSolutionStatusFunction",
runtime=lambda_.Runtime.PYTHON_3_12,
handler="index.handler",
code=lambda_.Code.from_inline(self._get_check_solution_status_code()),
role=lambda_role,
timeout=Duration.minutes(1)
)

# Create Campaign
create_campaign_fn = lambda_.Function(
self, "CreateCampaignFunction",
runtime=lambda_.Runtime.PYTHON_3_12,
handler="index.handler",
code=lambda_.Code.from_inline(self._get_create_campaign_code()),
role=lambda_role,
timeout=Duration.minutes(5)
)