使用 Amazon SQS — Python 文档
使用亚马逊 SQS
安装
对于 Amazon SQS 支持,您必须安装其他依赖项。 您可以使用 celery[sqs]
bundle 一次性安装 Celery 和这些依赖项:
$ pip install celery[sqs]
配置
您必须在代理 URL 中指定 SQS:
broker_url = 'sqs://ABCDEFGHIJKLMNOPQRST:ZYXK7NiynGlTogH8Nj+P9nlE73sq3@'
其中 URL 格式为:
sqs://aws_access_key_id:aws_secret_access_key@
请注意,您必须记住在末尾包含 @
符号并对密码进行编码,以便始终可以正确解析。 例如:
from kombu.utils.url import safequote
aws_access_key = safequote("ABCDEFGHIJKLMNOPQRST")
aws_secret_key = safequote("ZYXK7NiynG/TogH8Nj+P9nlE73sq3")
broker_url = "sqs://{aws_access_key}:{aws_secret_key}@".format(
aws_access_key=aws_access_key, aws_secret_key=aws_secret_key,
)
也可以使用环境变量 AWS_ACCESS_KEY_ID
和 AWS_SECRET_ACCESS_KEY
设置登录凭据,在这种情况下,代理 URL 可能仅为 [ X157X]。
如果您在实例上使用 IAM 角色,您可以将 BROKER_URL 设置为:sqs://
,kombu 将尝试从实例元数据中检索访问令牌。
选项
地区
默认区域是 us-east-1
但您可以通过配置 :setting:`broker_transport_options` 设置来选择另一个区域:
broker_transport_options = {'region': 'eu-west-1'}
可见性超时
可见性超时定义了在将消息重新传递给另一个工作人员之前等待工作人员确认任务的秒数。 另请参阅以下注意事项。
这个选项是通过 :setting:`broker_transport_options` 设置来设置的:
broker_transport_options = {'visibility_timeout': 3600} # 1 hour.
默认可见性超时为 30 分钟。
轮询间隔
轮询间隔决定了轮询失败之间休眠的秒数。 该值可以是 int 或 float。 默认情况下,该值为 一秒 :这意味着当没有更多消息要读取时,worker 将休眠一秒钟。
你必须注意更频繁的轮询也更昂贵,所以增加轮询间隔可以节省你的钱。
轮询间隔可以通过 :setting:`broker_transport_options` 设置来设置:
broker_transport_options = {'polling_interval': 0.3}
非常频繁的轮询间隔会导致 繁忙循环 ,从而导致工作人员使用大量 CPU 时间。 如果您需要亚毫秒级精度,您应该考虑使用另一种传输方式,例如兔MQ , 或者 Redis .
长轮询
SQS长轮询默认开启,ReceiveMessage操作的WaitTimeSeconds
参数设置为10秒。
WaitTimeSeconds
参数的值可以通过 :setting:`broker_transport_options` 设置来设置:
broker_transport_options = {'wait_time_seconds': 15}
有效值为 0 到 20。 请注意,新创建的队列本身(如果由 Celery 创建)将为“接收消息等待时间”队列属性设置默认值 0。
队列前缀
默认情况下,Celery 不会为队列名称分配任何前缀,如果您有其他使用 SQS 的服务,您可以使用 :setting:`broker_transport_options` 设置进行配置:
broker_transport_options = {'queue_name_prefix': 'celery-'}
预定义队列
如果您希望 Celery 在 AWS 中使用一组预定义队列,并且从不尝试列出 SQS 队列,也不尝试创建或删除它们,请使用 :setting:`predefined_queues` 将队列名称映射传递给 URL 设置:
broker_transport_options = {
'predefined_queues': {
'my-q': {
'url': 'https://ap-southeast-2.queue.amazonaws.com/123456/my-q',
'access_key_id': 'xxx',
'secret_access_key': 'xxx',
}
}
}
退让政策
回退策略使用 SQS 可见性超时机制来改变任务重试之间的时间差。 该机制将消息特定的 visibility timeout
从队列 Default visibility timeout
更改为策略配置的超时。 重试次数由 SQS 管理(具体由 ApproximateReceiveCount
消息属性)管理,用户无需进一步操作。
配置队列和退避策略:
broker_transport_options = {
'predefined_queues': {
'my-q': {
'url': 'https://ap-southeast-2.queue.amazonaws.com/123456/my-q',
'access_key_id': 'xxx',
'secret_access_key': 'xxx',
'backoff_policy': {1: 10, 2: 20, 3: 40, 4: 80, 5: 320, 6: 640},
'backoff_tasks': ['svc.tasks.tasks.task1']
}
}
}
backoff_policy
字典,其中键是重试次数,值是重试之间的延迟秒数(即 SQS 可见性超时) backoff_tasks
应用上述策略的任务名称列表
以上政策:
试图 | 延迟 |
2nd attempt
|
20 秒 |
3rd attempt
|
40 秒 |
4th attempt
|
80 秒 |
5th attempt
|
320 秒 |
6th attempt
|
640 秒 |
STS令牌认证
https://docs.aws.amazon.com/cli/latest/reference/sts/assume-role.html
使用 sts_role_arn
和 sts_token_timeout
代理传输选项支持 AWS STS 身份验证。 sts_role_arn
是我们用来授权访问 SQS 的假定 IAM 角色 ARN。 sts_token_timeout
是令牌超时,默认(和最小值)为 900 秒。 在上述时间段之后,将创建一个新令牌。
- broker_transport_options = {
- “预定义队列”:{
- '我的-Q':{
'url':'https://ap-southeast-2.queue.amazonaws.com/123456/my-q','access_key_id':'xxx','secret_access_key':'xxx', 'backoff_policy': {1: 10, 2: 20, 3: 40, 4: 80, 5: 320, 6: 640}, 'backoff_tasks': ['svc.tasks.tasks.task1']
}
},
'sts_role_arn': 'arn:aws:iam:: :role/STSTest', # 可选 'sts_token_timeout': 900 # 可选 }
注意事项
如果任务在
visibility_timeout
内没有得到确认,则该任务将重新交付给另一个 worker 并执行。这会导致执行时间超过可见性超时的 ETA/倒计时/重试任务出现问题; 事实上,如果发生这种情况,它将再次执行,并再次循环执行。
因此,您必须增加可见性超时以匹配您计划使用的最长 ETA 时间。
请注意,Celery 将在 worker 关闭时重新传递消息,因此长时间的可见性超时只会在发生电源故障或强制终止 worker 时延迟“丢失”任务的重新传递。
周期性任务不会受到可见性超时的影响,因为它是一个独立于 ETA/倒计时的概念。
在撰写本文时,AWS 支持的最大可见性超时为 12 小时(43200 秒):
broker_transport_options = {'visibility_timeout': 43200}
SQS 尚不支持工作人员远程控制命令。
SQS 尚不支持事件,因此不能与 celery events、celerymon 或 Django 管理监视器一起使用。
结果
Amazon Web Services 系列中的多个产品可能是存储或发布结果的理想选择,但此时不包含此类结果后端。
警告
不要将 amqp
结果后端与 SQS 一起使用。
它将为每个任务创建一个队列,并且不会收集队列。 这可能会花费你的钱,最好将 AWS 结果存储后端贡献给 Celery :)