def _get_import_job_code(self):
return '''
import boto3
import os
import time
def handler(event, context):
personalize = boto3.client("personalize")
job_name = f"import-job-{int(time.time())}"
response = personalize.create_dataset_import_job(
jobName=job_name,
datasetArn=os.environ["DATASET_ARN"],
dataSource={
"dataLocation": f"s3://{os.environ['BUCKET_NAME']}/data/interactions.csv"
},
roleArn=os.environ["ROLE_ARN"]
)
return {
"importJobArn": response["datasetImportJobArn"],
"status": "CREATE PENDING"
}
'''
def _get_check_import_status_code(self):
return '''
import boto3
def handler(event, context):
personalize = boto3.client("personalize")
response = personalize.describe_dataset_import_job(
datasetImportJobArn=event["importJobArn"]
)
return {
"importJobArn": event["importJobArn"],
"status": response["datasetImportJob"]["status"]
}
'''
def _get_create_solution_code(self):
return '''
import boto3
import os
import time
def handler(event, context):
personalize = boto3.client("personalize")
solution_name = f"solution-{int(time.time())}"
# Create Solution
solution_response = personalize.create_solution(
name=solution_name,
datasetGroupArn=os.environ["DATASET_GROUP_ARN"],
recipeArn="arn:aws:personalize:::recipe/aws-user-personalization"
)
# Create Solution Version
version_response = personalize.create_solution_version(
solutionArn=solution_response["solutionArn"]
)
return {
"solutionArn": solution_response["solutionArn"],
"solutionVersionArn": version_response["solutionVersionArn"],
"status": "CREATE PENDING"
}
'''
def _get_check_solution_status_code(self):
return '''
import boto3
def handler(event, context):
personalize = boto3.client("personalize")
response = personalize.describe_solution_version(
solutionVersionArn=event["solutionVersionArn"]
)
return {
"solutionArn": event["solutionArn"],
"solutionVersionArn": event["solutionVersionArn"],
"status": response["solutionVersion"]["status"]
}
'''
def _get_create_campaign_code(self):
return '''
import boto3
import time
def handler(event, context):
personalize = boto3.client("personalize")
campaign_name = f"campaign-{int(time.time())}"
response = personalize.create_campaign(
name=campaign_name,
solutionVersionArn=event["solutionVersionArn"],
minProvisionedTPS=1
)
return {
"campaignArn": response["campaignArn"],
"solutionVersionArn": event["solutionVersionArn"],
"status": "CREATED"
}
'''