Nhảy tới nội dung

Tasks and Workflow

State Machine Task definition

# ===================
# Step Functions Tasks
# ===================

# Task: Create Import Job
create_import_job_task = tasks.LambdaInvoke(
self, "CreateImportJob",
lambda_function=create_import_job_fn,
output_path="$.Payload"
)

# Task: Check Import Status
check_import_status_task = tasks.LambdaInvoke(
self, "CheckImportStatus",
lambda_function=check_import_status_fn,
output_path="$.Payload"
)

# Task: Create Solution
create_solution_task = tasks.LambdaInvoke(
self, "CreateSolution",
lambda_function=create_solution_fn,
output_path="$.Payload"
)

# Task: Check Solution Status
check_solution_status_task = tasks.LambdaInvoke(
self, "CheckSolutionStatus",
lambda_function=check_solution_status_fn,
output_path="$.Payload"
)

# Task: Create Campaign
create_campaign_task = tasks.LambdaInvoke(
self, "CreateCampaign",
lambda_function=create_campaign_fn,
output_path="$.Payload"
)

# Wait states for job import
wait_for_import = sfn.Wait(
self, "WaitForImport",
time=sfn.WaitTime.duration(Duration.minutes(2))
)

# Wait for model training
wait_for_solution = sfn.Wait(
self, "WaitForSolution",
time=sfn.WaitTime.duration(Duration.minutes(5))
)

# Choice states
is_import_complete = sfn.Choice(self, "IsImportComplete")
is_solution_complete = sfn.Choice(self, "IsSolutionComplete")

# Success/Fail states
success_state = sfn.Succeed(self, "PipelineSucceeded")
fail_state = sfn.Fail(self, "PipelineFailed", error="PipelineError")

Workflow

# ===================
# Define Workflow
# ===================
definition = (
create_import_job_task
.next(wait_for_import)
.next(check_import_status_task)
.next(
is_import_complete
.when(
sfn.Condition.string_equals("$.status", "ACTIVE"),
create_solution_task
)
.when(
sfn.Condition.string_equals("$.status", "CREATE FAILED"),
fail_state
)
.otherwise(wait_for_import)
)
)

create_solution_task.next(wait_for_solution)
.next(check_solution_status_task)
.next(
is_solution_complete
.when(
sfn.Condition.string_equals("$.status", "ACTIVE"),
create_campaign_task.next(success_state)
)
.when(
sfn.Condition.string_equals("$.status", "CREATE FAILED"),
fail_state
)
.otherwise(wait_for_solution)
)

State Machine Created

# ===================
# State Machine
# ===================
state_machine = sfn.StateMachine(
self, "PersonalizePipeline",
state_machine_name="personalize-training-pipeline",
definition=definition,
timeout=Duration.hours(3),
logs=sfn.LogOptions(
destination=logs.LogGroup(self, "PipelineLogs"),
level=sfn.LogLevel.ALL
)
)