Beispiele

Teilweise getestet mit Python 3.5, benötigt Bibliothek requests und Bibliothek pycrypto

Beispielbenutzung

Aufrufe der Funktionen von sn_rpc.py

Events für Gruppe 4 abfragen:

get_data_by_global_id("4",{"events":{}})

Events für Gruppe 4 und 3 abfragen:

get_data_by_global_id(["4","3"],{"events":{}})

Events für Gruppe 4 abfragen die vor dem 12.01.2012 liegen:

get_data_by_global_id("4",{"events":{"before":"12.01.2012"}})

Info über Gruppe 4 abfragen:

get_data_by_global_id("4",{"index":{}})

Übergeordnete Gruppe zu Gruppe 4 suchen:

get_data_by_global_id("4",{"index":{}})

Beispielclient

sn_rpc.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import os
import json
import hashlib
import time
import base64
import requests
import Crypto.Cipher.AES


SN_RPC_URL = "https://www.scoutnet.de/jsonrpc/server.php"

SN_RPC_IV = b'1234567890123456'


class RPCError(Exception):
    pass


def rpc(url, method, *params):
    """
    JSON-RPC 1.0 über HTTP(S) POST
    """
    call_id = os.urandom(16).hex()
    json_data = {
        "method": method,
        "params": params,
        "id": call_id
    }
    got = requests.post(
        url,
        json=json_data
    ).json()
    e = got["error"]
    if e is not None:
        raise RPCError(e)
    return got["result"]


def get_data_by_global_id(global_id, query={}):
    return rpc(
        SN_RPC_URL,
        "get_data_by_global_id",
        global_id,
        query
    )


def checkPermission(type, global_id, username, auth):
    return rpc(
        SN_RPC_URL,
        "checkPermission",
        type,
        global_id,
        username,
        auth
    )


def requestPermission(type, global_id, username, auth):
    return rpc(
        SN_RPC_URL,
        "requestPermission",
        type,
        global_id,
        username,
        auth
    )


def setData(type, id, object, username, auth):
    return rpc(
        SN_RPC_URL,
        "requestPermission",
        type,
        id,
        object,
        username,
        auth
    )


def deleteObject(type, global_id, id, username, auth):
    return rpc(
        SN_RPC_URL,
        "requestPermission",
        type,
        global_id,
        id,
        username,
        auth
    )


def generate_auth(api_key, check_value):
    auth = {
        "sha1": hashlib.sha1(check_value).hexdigest(),
        "md5": hashlib.md5(check_value).hexdigest(),
        "time": time.time()
    }
    auth = json.dumps(auth).encode("ascii")
    first_block = os.urandom(16)
    aes = Crypto.Cipher.AES.new(api_key, Crypto.Cipher.AES.MODE_CBC, SN_RPC_IV)
    blocks = first_block + auth
    blocks = blocks + bytes(16 - len(blocks) % 16)  # padding
    out = aes.encrypt(blocks)
    out = base64.b64encode(out)
    for a, b in ((b'+', b'-'), (b'/', b'_'), (b'=', b'~')):
        out = out.replace(a, b)
    return out


def generate_check_value(*args):
    return ("".join(str(p) for p in args)).encode("ascii")