Building Asynchronous Microservices with Tornado

Tim Jensen

uStudio

Example Code

https://github.com/tjensen/PyTexas2017

Requires Python 3.6 to run

Tornado

http://www.tornadoweb.org/

Web framework

Asynchronous networking library

Integrates with asyncio and Twisted

Supports Python 2.7 and 3.3+

Why Asynchronous?

Concurrency

Lightweight

Async/await makes it easier

Structure

Application

HTTP Server

IO Loop

Request Handlers

Application

Defines routes

Configures initial state

Application

Example:

def make_app(config):
    return tornado.web.Application([
        ("/api/v1/bart", BartHandler),
        ("/api/v1/flanders", FlandersHandler),
        ("/api/v1/healthcheck", HealthcheckHandler),
        ("/api/v1/homers/(.*)", HomersHandler),
        ("/api/v1/lisas/(.*)", LisasHandler),
        ("/api/v1/maggie", MaggieHandler),
        ("/api/v1/marges/(.*)", MargesHandler)
    ], **config)

# ...snip...
app = make_app({"mongo_db": mongo_db})

HTTP Server

Enables HTTP access to Application

Listens on port

    server = tornado.httpserver.HTTPServer(app)
    server.listen(int(environ["SERVER_PORT"]), "localhost")

IO Loop

Start the IO loop to make it run

ioloop = tornado.ioloop.IOLoop.current()
ioloop.start()

Request Handlers

Where work is done

class FlandersHandler(tornado.web.RequestHandler):
    def get(self):
        self.finish("Hi-dilly-ho, neighborino!")

MongoDB

Motor

Motor Logo

Use Motor to asynchronously interface with MongoDB.

API like PyMongo but with futures

Setup

Main creates MotorClient

motor_client = motor.motor_tornado.MotorClient(
    environ["MONGODB_URI"])

…and gets default database

mongo_db = motor_client.get_default_database()

Reading

GET handler calls find_one

class HomersHandler(tornado.web.RequestHandler):
    async def get(self, name):
        homer = await self.settings["mongo_db"].homers.find_one(
            {"name": name})

        if homer is None:
            raise tornado.web.HTTPError(
                404, f"Missing homer: {name}")

        self.finish(homer["content"])

Writing

POST handler calls replace_one

    async def post(self, name):
        await self.settings["mongo_db"].homers.replace_one(
            {"name": name},
            {
                "name": name,
                "content": json.loads(self.request.body)
            },
            upsert=True)

        self.set_status(204)
        self.finish()

Redis

aioredis

asyncio Redis client library

https://aioredis.readthedocs.io

Part of the aio-libs project: https://github.com/aio-libs

IO Loop

Configure Tornado to use asyncio event loop

tornado.platform.asyncio.AsyncIOMainLoop().install()

Setup

async def connect_redis(environ):
    return await aioredis.create_redis(
        (
            environ["REDIS_HOST"], environ["REDIS_PORT"]
        ))

Use run_sync to call before IO loop has started

redis = ioloop.run_sync(functools.partial(
    connect_redis, environ))

Reading

class MargesHandler(tornado.web.RequestHandler):
    async def get(self, name):
        marge = await self.settings["redis"].get(name)

        if marge is None:
            raise tornado.web.HTTPError(
                404, f"Missing marge: {name}")

        self.finish(marge)

Writing

    async def post(self, name):
        await self.settings["redis"].set(name, self.request.body)

        self.set_status(204)
        self.finish()

AWS S3

Boto 3

https://boto3.readthedocs.io

Not asynchronous

but we can make it work!

Wrapper Class

Create an asynchronous wrapper class

Use a Thread Pool Executor

class S3Object(object):
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)

    def __init__(self, bucket, key):
        self.bucket = boto3.resource("s3").Bucket(bucket)
        self.key = key

Run on Executor

Wrap the synchronous method with run_on_executor

    @tornado.concurrent.run_on_executor
    def _upload(self, data):
        self.bucket.upload_fileobj(
            Key=self.key,
            Fileobj=io.BytesIO(data))

    async def upload(self, data):
        await tornado.platform.asyncio.to_tornado_future(
            self._upload(data))

Request Handler

Now handler can call wrapper’s asynchronous method

    async def post(self):
        await self.settings["s3_object"].upload(self.request.body)

        self.set_status(204)
        self.finish()

AsyncHTTPClient

Simple client suitable for most needs

pycurl-based implementation also available

Raises on error, by default

HTTP Client Example

class MaggieHandler(tornado.web.RequestHandler):
    async def get(self):
        client = tornado.httpclient.AsyncHTTPClient()
        response = await client.fetch(self.settings["weather_uri"])

        body = json.loads(response.body)
        item = body["query"]["results"]["channel"]["item"]
        condition = item["condition"]
        temp = condition["temp"]
        text = condition["text"]

        self.finish(
            f"Currently {temp} degrees and {text} in Austin, TX")

Multiple Operations in Parallel

Use multi to run multiple asynchronous operations in parallel:

        await tornado.gen.multi([
            self.settings["mongo_db"].command("ping"),
            self.settings["redis"].ping(),
            self.check_mysql()
        ])

Thanks!

https://github.com/tjensen/PyTexas2017

Visit the uStudio booth!