Source code for jasy.http.Request

# Jasy - Web Tooling Framework
# Copyright 2010-2012 Zynga Inc.
# Copyright 2013-2014 Sebastian Werner

import shutil
import json
import base64
import os
import re
import random
import sys
import mimetypes
import http.client
import urllib.parse
import hashlib
import jasy.core.Console as Console

__all__ = ("requestUrl", "uploadData")

# Generic HTTP support

[docs]def requestUrl(url, content_type="text/plain", headers=None, method="GET", port=None, body="", user=None, password=None): """Generic HTTP request wrapper with support for basic authentification and automatic parsing of response content.""""Opening %s request to %s..." % (method, url)) parsed = urllib.parse.urlparse(url) if parsed.scheme == "http": request = http.client.HTTPConnection(parsed.netloc) elif parsed.scheme == "https": request = http.client.HTTPSConnection(parsed.netloc) else: raise Exception("Unsupported url: %s" % url) if parsed.query: request.putrequest(method, parsed.path + "?" + parsed.query) else: request.putrequest(method, parsed.path) request.putheader("Content-Type", content_type) request.putheader("Content-Length", str(len(body))) if user is not None and password is not None: auth = "Basic %s" % base64.b64encode(("%s:%s" % (user, password)).encode("utf-8")).decode("utf-8") request.putheader("Authorization", auth) request.endheaders() if body:"Sending data (%s bytes)..." % len(body)) else:"Sending request...") Console.indent() request.send(body) response = request.getresponse() res_code = int(response.getcode()) res_headers = dict(response.getheaders()) res_content = res_success = False if res_code >= 200 and res_code <= 300: Console.debug("HTTP Success!") res_success = True else: Console.error("HTTP Failure Code: %s!", res_code) if "Content-Type" in res_headers: res_type = res_headers["Content-Type"] if ";" in res_type: res_type = res_type.split(";")[0] if res_type in ("application/json", "text/html", "text/plain"): res_content = res_content.decode("utf-8") if res_type == "application/json": res_content = json.loads(res_content) if "error" in res_content: Console.error("Error %s: %s", res_content["error"], res_content["reason"]) elif "reason" in res_content:"Success: %s" % res_content["reason"]) Console.outdent() return res_success, res_headers, res_content
# # Multipart Support #
[docs]def uploadData(url, fields, files, user=None, password=None, method="POST"): """Easy wrapper for uploading content via HTTP multi part.""" content_type, body = encode_multipart_formdata(fields, files) return requestUrl(url, body=body, content_type=content_type, method=method, user=user, password=password)
def choose_boundary(): """Return a string usable as a multipart boundary.""" # Follow IE and Firefox nonce = "".join([str(random.randint(0, sys.maxsize - 1)) for i in (0, 1, 2)]) return "-" * 27 + nonce def get_content_type(filename): """Figures out the content type of the given file.""" return mimetypes.guess_type(filename)[0] or "application/octet-stream" def encode_multipart_formdata(fields, files): """Encodes given fields and files to a multipart ready HTTP body.""" # Choose random boundary boundary = choose_boundary() # Build HTTP content type with generated boundary content_type = "multipart/form-data; boundary=%s" % boundary # Join all fields and files into one collection of lines lines = [] for (key, value) in fields: lines.append("--" + boundary) lines.append('Content-Disposition: form-data; name="' + key + '"') lines.append("") lines.append(value) for (key, filename, value) in files: lines.append("--" + boundary) lines.append('Content-Disposition: form-data; name="' + key + '"; filename="' + filename + '"') lines.append('Content-Type: ' + get_content_type(filename)) lines.append("") lines.append(value) lines.append("--" + boundary + "--") lines.append("") # Encode and join all lines as ascii bytelines = [line if isinstance(line, bytes) else line.encode("ascii") for line in lines] body = "\r\n".encode("ascii").join(bytelines) return content_type, body