DK
Size: a a a
AT
K
ФТ
ФТ
K
def get_messages_from_queue(queue_url):
"""Generates messages from an SQS queue.
Note: this continues to generate messages until the queue is empty.
Every message on the queue will be deleted.
:param queue_url: URL of the SQS queue to drain.
"""
sqs_client = boto3.client("sqs")
while True:
resp = sqs_client.receive_message(
QueueUrl=queue_url,
AttributeNames=["All"],
MaxNumberOfMessages=10,
WaitTimeSeconds=10,
)
try:
yield from resp["Messages"]
except KeyError:
return
entries = [
{"Id": msg["MessageId"], "ReceiptHandle": msg["ReceiptHandle"]}
for msg in resp["Messages"]
]
resp = sqs_client.delete_message_batch(
QueueUrl=queue_url, Entries=entries
)
if len(resp["Successful"]) != len(entries):
raise RuntimeError(
f"Failed to delete messages: entries={entries!r} resp={resp!r}”
from pprint import pprint
from time import sleep
while True:
for message in get_messages_from_queue(QUEUE_URL):
pprint(f"STARTING PROCESSING MESSAGE {message['Body']}")
sleep(5)
print(f"PROCESSING MESSAGE {message['Body']} COMPLETED”)
ФТ
ФТ
ФТ
ФТ
ФТ
K
ФТ