Source code for syncthing

#! /usr/bin/env python
# -*- coding: utf-8 -*-
# >>
#     Copyright (c) 2016-2017, Blake VandeMerwe
#
#       Permission is hereby granted, free of charge, to any person obtaining
#       a copy of this software and associated documentation files
#       (the "Software"), to deal in the Software without restriction,
#       including without limitation the rights to use, copy, modify, merge,
#       publish, distribute, sublicense, and/or sell copies of the Software,
#       and to permit persons to whom the Software is furnished to do so, subject
#       to the following conditions: The above copyright notice and this permission
#       notice shall be included in all copies or substantial portions
#       of the Software.
#
#     python-syncthing, 2016
# <<
from __future__ import unicode_literals

import os
import sys
import json
import logging
import warnings
from collections import namedtuple

import requests
from dateutil.parser import parse as dateutil_parser
from requests.exceptions import ConnectionError, ConnectTimeout

PY2 = sys.version_info[0] < 3

if PY2:
    string_types = (basestring, str, unicode,)
    def reraise(msg, base):
        raise Syncthing(msg)

else:
    string_types = (str,)
    def reraise(msg, exc):
        raise SyncthingError(msg) from exc

logger = logging.getLogger(__name__)


NoneType = type(None)
DEFAULT_TIMEOUT = 10.0

__all__ = ['SyncthingError', 'ErrorEvent', 'BaseAPI', 'System',
           'Database', 'Statistics', 'Syncthing',
           # methods
           'keys_to_datetime', 'parse_datetime']

ErrorEvent = namedtuple('ErrorEvent', 'when, message')
"""tuple[datetime.datetime,str]: used to process error lists more easily, 
instead of by two-key dictionaries. """

def _syncthing():
    KEY = os.getenv('SYNCTHING_API_KEY')
    HOST = os.getenv('SYNCTHING_HOST', '127.0.0.1')
    PORT = os.getenv('SYNCTHING_PORT', 8384)
    IS_HTTPS = bool(int(os.getenv('SYNCTHING_HTTPS', '0')))
    SSL_CERT_FILE = os.getenv('SYNCTHING_CERT_FILE')
    return Syncthing(KEY, HOST, PORT, 10.0, IS_HTTPS, SSL_CERT_FILE)


[docs]def keys_to_datetime(obj, *keys): """ Converts all the keys in an object to DateTime instances. Args: obj (dict): the JSON-like ``dict`` object to modify inplace. keys (str): keys of the object being converted into DateTime instances. Returns: dict: ``obj`` inplace. >>> keys_to_datetime(None) is None True >>> keys_to_datetime({}) {} >>> a = {} >>> id(keys_to_datetime(a)) == id(a) True >>> a = {'one': '2016-06-06T19:41:43.039284', 'two': '2016-06-06T19:41:43.039284'} >>> keys_to_datetime(a) == a True >>> keys_to_datetime(a, 'one')['one'] datetime.datetime(2016, 6, 6, 19, 41, 43, 39284) >>> keys_to_datetime(a, 'one')['two'] '2016-06-06T19:41:43.039284' """ if not keys: return obj for k in keys: if k not in obj: continue v = obj[k] if not isinstance(v, string_types): continue obj[k] = parse_datetime(v) return obj
[docs]def parse_datetime(s, **kwargs): """ Converts a time-string into a valid :py:class:`~datetime.datetime.DateTime` object. Args: s (str): string to be formatted. ``**kwargs`` is passed directly to :func:`.dateutil_parser`. Returns: :py:class:`~datetime.datetime.DateTime` """ if not s: return None try: ret = dateutil_parser(s, **kwargs) except (OverflowError, TypeError, ValueError) as e: logger.exception(e, exc_info=True) reraise('datetime parsing error from %s' % s, e) return ret
[docs]class SyncthingError(Exception): """Base Syncthing Exception class all non-assert errors will raise from."""
[docs]class BaseAPI(object): """ Placeholder for HTTP REST API URL prefix. """ prefix = '' def __init__(self, api_key, host='localhost', port=8384, timeout=DEFAULT_TIMEOUT, is_https=False, ssl_cert_file=None): if is_https and not ssl_cert_file: logger.warning('using https without specifying ssl_cert_file') if ssl_cert_file: if not os.path.exists(ssl_cert_file): raise SyncthingError( 'ssl_cert_file does not exist at location, %s' % ssl_cert_file) self.api_key = api_key self.host = host self.is_https = is_https self.port = port self.ssl_cert_file = ssl_cert_file self.timeout = timeout self.verify = True if ssl_cert_file else False self._headers = { 'X-API-Key': api_key } self.url = '{proto}://{host}:{port}'.format( proto='https' if is_https else 'http', host=host, port=port) self._base_url = self.url + '{endpoint}' def get(self, endpoint, data=None, headers=None, params=None, return_response=False, raw_exceptions=False): endpoint = self.prefix + endpoint return self._request('GET', endpoint, data, headers, params, return_response, raw_exceptions) def post(self, endpoint, data=None, headers=None, params=None, return_response=False, raw_exceptions=False): endpoint = self.prefix + endpoint return self._request('POST', endpoint, data, headers, params, return_response, raw_exceptions) def _request(self, method, endpoint, data=None, headers=None, params=None, return_response=False, raw_exceptions=False): method = method.upper() endpoint = self._base_url.format(endpoint=endpoint) if method not in ('GET', 'POST', 'PUT', 'DELETE'): raise SyncthingError( 'unsupported http verb requested, %s' % method) if data is None: data = {} assert isinstance(data, string_types) or isinstance(data, dict) if headers is None: headers = {} assert isinstance(headers, dict) headers.update(self._headers) try: resp = requests.request( method, endpoint, data=json.dumps(data), params=params, timeout=self.timeout, verify=self.verify, cert=self.ssl_cert_file, headers=headers ) if not return_response: resp.raise_for_status() except requests.RequestException as e: if raw_exceptions: raise e logger.exception(e) reraise('http request error', e) else: if return_response: return resp if resp.status_code != requests.codes.ok: logger.error('%d %s (%s): %s', resp.status_code, resp.reason, resp.url, resp.text) return resp if 'json' in resp.headers.get('Content-Type', 'text/plain')\ .lower(): json_data = resp.json() else: content = resp.content.decode('utf-8') if content and content[0] == '{' and content[-1] == '}': json_data = json.loads(content) else: return content if isinstance(json_data, dict) and json_data.get('error'): api_err = json_data.get('error') raise SyncthingError(api_err) return json_data
[docs]class System(BaseAPI): """ HTTP REST endpoint for System calls.""" prefix = '/rest/system/'
[docs] def browse(self, path=None): """ Returns a list of directories matching the path given. Args: path (str): glob pattern. Returns: List[str] """ params = None if path: assert isinstance(path, string_types) params = {'current': path} return self.get('browse', params=params)
[docs] def config(self): """ Returns the current configuration. Returns: dict >>> s = _syncthing().system >>> config = s.config() >>> config ... # doctest: +ELLIPSIS {...} >>> 'version' in config and config['version'] >= 15 True >>> 'folders' in config True >>> 'devices' in config True """ return self.get('config')
[docs] def set_config(self, config, and_restart=False): """ Post the full contents of the configuration, in the same format as returned by :func:`.config`. The configuration will be saved to disk and the ``configInSync`` flag set to ``False``. Restart Syncthing to activate.""" assert isinstance(config, dict) self.post('config', data=config) if and_restart: self.restart()
[docs] def config_insync(self): """ Returns whether the config is in sync, i.e. whether the running configuration is the same as that on disk. Returns: bool """ status = self.get('config/insync').get('configInSync', False) if status is None: status = False return status
[docs] def connections(self): """ Returns the list of configured devices and some metadata associated with them. The list also contains the local device itself as not connected. Returns: dict >>> s = _syncthing().system >>> connections = s.connections() >>> sorted([k for k in connections.keys()]) ['connections', 'total'] >>> isinstance(connections['connections'], dict) True >>> isinstance(connections['total'], dict) True """ return self.get('connections')
[docs] def debug(self): """ Returns the set of debug facilities and which of them are currently enabled. Returns: dict >>> s = _syncthing().system >>> debug = s.debug() >>> debug ... #doctest: +ELLIPSIS {...} >>> len(debug.keys()) 2 >>> 'enabled' in debug and 'facilities' in debug True >>> isinstance(debug['enabled'], list) or debug['enabled'] is None True >>> isinstance(debug['facilities'], dict) True """ return self.get('debug')
[docs] def disable_debug(self, *on): """ Disables debugging for specified facilities. Args: on (str): debugging points to apply ``disable``. Returns: None """ self.post('debug', params={'disable': ','.join(on)})
[docs] def enable_debug(self, *on): """ Enables debugging for specified facilities. Args: on (str): debugging points to apply ``enable``. Returns: None """ self.post('debug', params={'enable': ','.join(on)})
[docs] def discovery(self): """ Returns the contents of the local discovery cache. Returns: dict """ return self.get('discovery')
[docs] def add_discovery(self, device, address): """ Add an entry to the discovery cache. Args: device (str): Device ID. address (str): destination address, a valid hostname or IP address that's serving a Syncthing instance. Returns: None """ self.post('discovery', params={'device': device, 'address': address})
[docs] def clear(self): """ Remove all recent errors. Returns: None """ self.post('error/clear')
[docs] def clear_errors(self): """ Alias function for :meth:`.clear`. """ self.clear()
[docs] def errors(self): """ Returns the list of recent errors. Returns: list: of :obj:`.ErrorEvent` tuples. """ ret_errs = list() errors = self.get('error').get('errors', None) or list() assert isinstance(errors, list) for err in errors: when = parse_datetime(err.get('when', None)) msg = err.get('message', '') e = ErrorEvent(when, msg) ret_errs.append(e) return ret_errs
[docs] def show_error(self, message): """ Send an error message to the active client. The new error will be displayed on any active GUI clients. Args: message (str): Plain-text message to display. Returns: None >>> s = _syncthing() >>> s.system.show_error('my error msg') >>> s.system.errors()[0] ... # doctest: +ELLIPSIS ErrorEvent(when=datetime.datetime(...), message='"my error msg"') >>> s.system.clear_errors() >>> s.system.errors() [] """ assert isinstance(message, string_types) self.post('error', data=message)
[docs] def log(self): """ Returns the list of recent log entries. Returns: dict """ return self.get('log')
[docs] def pause(self, device): """ Pause the given device. Args: device (str): Device ID. Returns: dict: with keys ``success`` and ``error``. """ resp = self.post('pause', params={'device': device}, return_response=True) error = resp.text if not error: error = None return {'success': resp.status_code == requests.codes.ok, 'error': error}
[docs] def ping(self, with_method='GET'): """ Pings the Syncthing server. Args: with_method (str): uses a given HTTP method, options are ``GET`` and ``POST``. Returns: dict """ assert with_method in ('GET', 'POST') if with_method == 'GET': return self.get('ping') return self.post('ping')
[docs] def reset(self): """ Erase the current index database and restart Syncthing. Returns: None """ warnings.warn('This is a destructive action that cannot be undone.') self.post('reset', data={})
[docs] def reset_folder(self, folder): """ Erase the database index from a given folder and restart Syncthing. Args: folder (str): Folder ID. Returns: None """ warnings.warn('This is a destructive action that cannot be undone.') self.post('reset', data={}, params={'folder': folder})
[docs] def restart(self): """ Immediately restart Syncthing. Returns: None """ self.post('restart', data={})
[docs] def resume(self, device): """ Resume the given device. Args: device (str): Device ID. Returns: dict: with keys ``success`` and ``error``. """ resp = self.post('resume', params={'device': device}, return_response=True) error = resp.text if not error: error = None return {'success': resp.status_code == requests.codes.ok, 'error': error}
[docs] def shutdown(self): """ Causes Syncthing to exit and not restart. Returns: None """ self.post('shutdown', data={})
[docs] def status(self): """ Returns information about current system status and resource usage. Returns: dict """ resp = self.get('status') resp = keys_to_datetime(resp, 'startTime') return resp
[docs] def upgrade(self): """ Checks for a possible upgrade and returns an object describing the newest version and upgrade possibility. Returns: dict """ return self.get('upgrade')
[docs] def can_upgrade(self): """ Returns when there's a newer version than the instance running. Returns: bool """ return (self.upgrade() or {}).get('newer', False)
[docs] def do_upgrade(self): """ Perform an upgrade to the newest released version and restart. Does nothing if there is no newer version than currently running. Returns: None """ return self.post('upgrade')
[docs] def version(self): """ Returns the current Syncthing version information. Returns: dict """ return self.get('version')
[docs]class Database(BaseAPI): """ HTTP REST endpoint for Database calls.""" prefix = '/rest/db/'
[docs] def browse(self, folder, levels=None, prefix=None): """ Returns the directory tree of the global model. Directories are always JSON objects (map/dictionary), and files are always arrays of modification time and size. The first integer is the files modification time, and the second integer is the file size. Args: folder (str): The root folder to traverse. levels (int): How deep within the tree we want to dwell down. (0 based, defaults to unlimited depth) prefix (str): Defines a prefix within the tree where to start building the structure. Returns: dict """ assert isinstance(levels, int) or levels is None assert isinstance(prefix, string_types) or prefix is None return self.get('browse', params={'folder': folder, 'levels': levels, 'prefix': prefix})
[docs] def completion(self, device, folder): """ Returns the completion percentage (0 to 100) for a given device and folder. Args: device (str): The Syncthing device the folder is syncing to. folder (str): The folder that is being synced. Returs: int """ return self.get( 'completion', params={'folder': folder, 'device': device} ).get('completion', None)
[docs] def file(self, folder, file_): """ Returns most data available about a given file, including version and availability. Args: folder (str): file_ (str): Returns: dict """ return self.get('file', params={'folder': folder, 'file': file_})
[docs] def ignores(self, folder): """ Returns the content of the ``.stignore`` as the ignore field. A second field, expanded, provides a list of strings which represent globbing patterns described by gobwas/glob (based on standard wildcards) that match the patterns in ``.stignore`` and all the includes. If appropriate these globs are prepended by the following modifiers: ``!`` to negate the glob, ``(?i)`` to do case insensitive matching and ``(?d)`` to enable removing of ignored files in an otherwise empty directory. Args: folder Returns: dict """ return self.get('ignores', params={'folder': folder})
[docs] def set_ignores(self, folder, *patterns): """ Applies ``patterns`` to ``folder``'s ``.stignore`` file. Args: folder (str): patterns (str): Returns: dict """ if not patterns: return {} data = {'ignore': list(patterns)} return self.post('ignores', params={'folder': folder}, data=data)
[docs] def need(self, folder, page=None, perpage=None): """ Returns lists of files which are needed by this device in order for it to become in sync. Args: folder (str): page (int): If defined applies pagination accross the collection of results. perpage (int): If defined applies pagination across the collection of results. Returns: dict """ assert isinstance(page, int) or page is None assert isinstance(perpage, int) or perpage is None self.get('need', params={'folder': folder, 'page': page, 'perpage': perpage})
[docs] def override(self, folder): """ Request override of a send-only folder. Args: folder (str): folder ID. Returns: dict """ self.post('override', params={'folder': folder})
[docs] def prio(self, folder, file_): """ Moves the file to the top of the download queue. Args: folder (str): file_ (str): Returns: dict """ self.post('prio', params={'folder': folder, 'file': file_})
[docs] def scan(self, folder, sub=None, next_=None): """ Request immediate rescan of a folder, or a specific path within a folder. Args: folder (str): Folder ID. sub (str): Path relative to the folder root. If sub is omitted the entire folder is scanned for changes, otherwise only the given path children are scanned. next_ (int): Delays Syncthing's automated rescan interval for a given amount of seconds. Returns: str """ if not sub: sub = '' assert isinstance(sub, string_types) assert isinstance(next_, int) or next_ is None return self.post('scan', params={'folder': folder, 'sub': sub, 'next': next_})
[docs] def status(self, folder): """ Returns information about the current status of a folder. Note: This is an expensive call, increasing CPU and RAM usage on the device. Use sparingly. Args: folder (str): Folder ID. Returns: dict """ return self.get('status', params={'folder': folder})
[docs]class Events(BaseAPI): """ HTTP REST endpoints for Event based calls. Syncthing provides a simple long polling interface for exposing events from the core utility towards a GUI. .. code-block:: python syncthing = Syncthing() event_stream = syncthing.events(limit=5) for event in event_stream: print(event) if event_stream.count > 10: event_stream.stop() """ prefix = '/rest/' def __init__(self, api_key, last_seen_id=None, filters=None, limit=None, *args, **kwargs): if 'timeout' not in kwargs: # increase our timeout to account for long polling. # this will reduce the number of timed-out connections, which are # swallowed by the library anyway kwargs['timeout'] = 60.0 #seconds super(Events, self).__init__(api_key, *args, **kwargs) self._last_seen_id = last_seen_id or 0 self._filters = filters self._limit = limit self._count = 0 self.blocking = True @property def count(self): """ The number of events that have been processed by this event stream. Returns: int """ return self._count
[docs] def disk_events(self): """ Blocking generator of disk related events. Each event is represented as a ``dict`` with metadata. Returns: generator[dict] """ for event in self._events('events/disk', None, self._limit): yield event
[docs] def stop(self): """ Breaks the while-loop while the generator is polling for event changes. Returns: None """ self.blocking = False
def _events(self, using_url, filters=None, limit=None): """ A long-polling method that queries Syncthing for events.. Args: using_url (str): REST HTTP endpoint filters (List[str]): Creates an "event group" in Syncthing to only receive events that have been subscribed to. limit (int): The number of events to query in the history to catch up to the current state. Returns: generator[dict] """ # coerce if not isinstance(limit, (int, NoneType)): limit = None # coerce if filters is None: filters = [] # format our list into the correct expectation of string with commas if isinstance(filters, string_types): filters = filters.split(',') # reset the state if the loop was broken with `stop` if not self.blocking: self.blocking = True # block/long-poll for updates to the events api while self.blocking: params = { 'since': self._last_seen_id, 'limit': limit, } if filters: params['events'] = ','.join(map(str, filters)) try: data = self.get(using_url, params=params, raw_exceptions=True) except (ConnectTimeout, ConnectionError) as e: # swallow timeout errors for long polling data = None except Exception as e: reraise('', e) if data: # update our last_seen_id to move our event counter forward self._last_seen_id = data[-1]['id'] for event in data: # handle potentially multiple events returned in a list self._count += 1 yield event def __iter__(self): """ Helper interface for :obj:`._events` """ for event in self._events('events', self._filters, self._limit): yield event
[docs]class Statistics(BaseAPI): """ HTTP REST endpoint for Statistic calls.""" prefix = '/rest/stats/'
[docs] def device(self): """ Returns general statistics about devices. Currently, only contains the time the device was last seen. Returns: dict """ return self.get('device')
[docs] def folder(self): """ Returns general statistics about folders. Currently contains the last scan time and the last synced file. Returns: dict """ return self.get('folder')
[docs]class Misc(BaseAPI): """ HTTP REST endpoint for Miscelaneous calls.""" prefix = '/rest/svc/'
[docs] def device_id(self, id_): """ Verifies and formats a device ID. Accepts all currently valid formats (52 or 56 characters with or without separators, upper or lower case, with trivial substitutions). Takes one parameter, id, and returns either a valid device ID in modern format, or an error. Args: id_ (str) Raises: SyncthingError: when ``id_`` is an invalid length. Returns: str """ return self.get('deviceid', params={'id': id_}).get('id')
[docs] def language(self): """ Returns a list of canonicalized localization codes, as picked up from the Accept-Language header sent by the browser. By default, this API will return a single element that's empty; however calling :func:`Misc.get` directly with `lang` you can set specific headers to get values back as intended. Returns: List[str] >>> s = _syncthing() >>> len(s.misc.language()) 1 >>> s.misc.language()[0] '' >>> s.misc.get('lang', headers={'Accept-Language': 'en-us'}) ['en-us'] """ return self.get('lang')
[docs] def random_string(self, length=32): """ Returns a strong random generated string (alphanumeric) of the specified length. Args: length (int): default ``32``. Returns: str >>> s = _syncthing() >>> len(s.misc.random_string()) 32 >>> len(s.misc.random_string(32)) 32 >>> len(s.misc.random_string(1)) 1 >>> len(s.misc.random_string(0)) 32 >>> len(s.misc.random_string(None)) 32 >>> import string >>> all_letters = string.ascii_letters + string.digits >>> all([c in all_letters for c in s.misc.random_string(128)]) True >>> all([c in all_letters for c in s.misc.random_string(1024)]) True """ return self.get( 'random/string', params={'length': length} ).get('random', None)
[docs] def report(self): """ Returns the data sent in the anonymous usage report. Returns: dict >>> s = _syncthing() >>> report = s.misc.report() >>> 'version' in report True >>> 'longVersion' in report True >>> 'syncthing v' in report['longVersion'] True """ return self.get('report')
[docs]class Syncthing(object): """ Default interface for interacting with Syncthing server instance. Args: api_key (str) host (str) port (int) timeout (float) is_https (bool) ssl_cert_file (str) Attributes: system: instance of :class:`.System`. database: instance of :class:`.Database`. stats: instance of :class:`.Statistics`. misc: instance of :class:`.Misc`. Note: - attribute :attr:`.db` is an alias of :attr:`.database` - attribute :attr:`.sys` is an alias of :attr:`.system` """ def __init__(self, api_key, host='localhost', port=8384, timeout=DEFAULT_TIMEOUT, is_https=False, ssl_cert_file=None): # save this for deferred api sub instances self.__api_key = api_key self.api_key = api_key self.host = host self.port = port self.timeout = timeout self.is_https = is_https self.ssl_cert_file = ssl_cert_file self.__kwargs = kwargs = { 'host': host, 'port': port, 'timeout': timeout, 'is_https': is_https, 'ssl_cert_file': ssl_cert_file } self.system = self.sys = System(api_key, **kwargs) self.database = self.db = Database(api_key, **kwargs) self.stats = Statistics(api_key, **kwargs) self.misc = Misc(api_key, **kwargs) def events(self, last_seen_id=None, filters=None, **kwargs): kw = dict(self.__kwargs) kw.update(kwargs) return Events(api_key=self.__api_key, last_seen_id=last_seen_id, filters=filters, **kw)
if __name__ == "__main__": import doctest doctest.testmod()