Source code for gratipay.sync_npm

# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, print_function, unicode_literals

import requests
from couchdb import Database

from gratipay.models.package import NPM, Package


def get_last_seq(db):
    return'SELECT npm_last_seq FROM worker_coordination')

[docs]def production_change_stream(seq): """Given a sequence number in the npm registry change stream, start streaming from there! """ return Database(REGISTRY_URL).changes(feed='continuous', include_docs=True, since=seq)
[docs]def process_doc(doc): """Return a smoothed-out doc, or None if it's not a package doc, meaning there's no name key and it's probably a design doc, per: """ if 'name' not in doc: return None name = doc['name'] description = doc.get('description', '') emails = [e for e in [m.get('email') for m in doc.get('maintainers', [])] if e.strip()] return {'name': name, 'description': description, 'emails': sorted(set(emails))}
[docs]def consume_change_stream(stream, db): """Given an iterable of CouchDB change notifications and a :py:class:`~GratipayDB`, read from the stream and write to the db. The npm registry is a CouchDB app, which means we get a change stream from it that allows us to follow registry updates in near-realtime. Our strategy here is to maintain open connections to both the registry and our own database, and write as we read. """ with db.get_connection() as connection: for change in stream: # Decide what to do. if change.get('deleted'): package = Package.from_names(NPM, change['id']) if not package: # As a result of CouchDB's compaction algorithm, we might # receive 'deleted' events for docs even if we haven't seen # the corresponding events for when the doc was created continue op, kw = package.delete, {} else: op = Package.upsert doc = change.get('doc') if not doc: continue # We've seen this in the wild. kw = process_doc(doc) if not kw: continue kw['package_manager'] = NPM # Do it. cursor = connection.cursor() kw['cursor'] = cursor op(**kw)'UPDATE worker_coordination SET npm_last_seq=%(seq)s', change) connection.commit()
def check(db, _print=print): ours ='SELECT npm_last_seq FROM worker_coordination') theirs = int(requests.get(REGISTRY_URL).json()['update_seq']) _print("count#npm-sync-lag={}".format(theirs - ours))