kore

a fork of the worlds most advanced web framework
Log | Files | Refs | README | LICENSE

commit 20a0103f1e9a01bf22b6e35ecc02c1921e86b17b
parent c12f2967439ca2dea1f868f23e2a78e184cf4705
Author: Joris Vink <joris@coders.se>
Date:   Mon, 15 Oct 2018 20:18:54 +0200

Add async/await support for socket i/o in python.

This means you can now do things like:

	resp = await koresock.recv(1024)
	await koresock.send(resp)

directly from page handlers if they are defined as async.

Adds lots more to the python goo such as fatalx(), bind_unix(),
task_create() and socket_wrap().

Diffstat:
MMakefile | 2++
Minclude/kore/http.h | 5++---
Minclude/kore/kore.h | 2++
Minclude/kore/python_api.h | 2++
Minclude/kore/python_methods.h | 96+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
Msrc/bsd.c | 6++++++
Msrc/http.c | 11+++++------
Msrc/kore.c | 15++++++++++++---
Msrc/python.c | 691++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----
Msrc/worker.c | 3+++
10 files changed, 782 insertions(+), 51 deletions(-)

diff --git a/Makefile b/Makefile @@ -110,6 +110,8 @@ else ifeq ("$(OSNAME)", "linux") CFLAGS+=-D_GNU_SOURCE=1 -D_FORTIFY_SOURCE=2 LDFLAGS+=-ldl S_SRC+=src/linux.c + CFLAGS+=-I/home/joris/crypto/openssl/openssl-1.0.2o/include + LDFLAGS+=-L/home/joris/crypto/openssl/openssl-1.0.2o else S_SRC+=src/bsd.c ifneq ("$(JSONRPC)", "") diff --git a/include/kore/http.h b/include/kore/http.h @@ -243,13 +243,12 @@ struct http_request { size_t state_len; char *query_string; struct kore_module_handle *hdlr; - - u_int8_t http_body_digest[HTTP_BODY_DIGEST_LEN]; - #if defined(KORE_USE_PYTHON) void *py_coro; #endif + u_int8_t http_body_digest[HTTP_BODY_DIGEST_LEN]; + LIST_HEAD(, kore_task) tasks; LIST_HEAD(, kore_pgsql) pgsqls; diff --git a/include/kore/kore.h b/include/kore/kore.h @@ -160,6 +160,7 @@ TAILQ_HEAD(netbuf_head, netbuf); #define KORE_TYPE_CONNECTION 2 #define KORE_TYPE_PGSQL_CONN 3 #define KORE_TYPE_TASK 4 +#define KORE_TYPE_PYSOCKET 5 #define CONN_STATE_UNKNOWN 0 #define CONN_STATE_TLS_SHAKE 1 @@ -556,6 +557,7 @@ void kore_platform_event_init(void); void kore_platform_event_cleanup(void); void kore_platform_proctitle(char *); void kore_platform_disable_read(int); +void kore_platform_disable_write(int); void kore_platform_enable_accept(void); void kore_platform_disable_accept(void); int kore_platform_event_wait(u_int64_t); diff --git a/include/kore/python_api.h b/include/kore/python_api.h @@ -25,7 +25,9 @@ void kore_python_init(void); void kore_python_cleanup(void); +void kore_python_coro_run(void); void kore_python_path(const char *); +void kore_python_coro_delete(void *); PyObject *kore_python_callable(PyObject *, const char *); diff --git a/include/kore/python_methods.h b/include/kore/python_methods.h @@ -16,7 +16,11 @@ static PyObject *python_kore_log(PyObject *, PyObject *); static PyObject *python_kore_fatal(PyObject *, PyObject *); -static PyObject *python_kore_listen(PyObject *, PyObject *); +static PyObject *python_kore_fatalx(PyObject *, PyObject *); +static PyObject *python_kore_bind(PyObject *, PyObject *); +static PyObject *python_kore_bind_unix(PyObject *, PyObject *); +static PyObject *python_kore_task_create(PyObject *, PyObject *); +static PyObject *python_kore_socket_wrap(PyObject *, PyObject *); #if defined(KORE_USE_PGSQL) static PyObject *python_kore_pgsql_register(PyObject *, PyObject *); @@ -32,7 +36,11 @@ static PyObject *python_websocket_broadcast(PyObject *, PyObject *); static struct PyMethodDef pykore_methods[] = { METHOD("log", python_kore_log, METH_VARARGS), METHOD("fatal", python_kore_fatal, METH_VARARGS), - METHOD("listen", python_kore_listen, METH_VARARGS), + METHOD("fatalx", python_kore_fatalx, METH_VARARGS), + METHOD("bind", python_kore_bind, METH_VARARGS), + METHOD("bind_unix", python_kore_bind_unix, METH_VARARGS), + METHOD("task_create", python_kore_task_create, METH_VARARGS), + METHOD("socket_wrap", python_kore_socket_wrap, METH_VARARGS), METHOD("websocket_broadcast", python_websocket_broadcast, METH_VARARGS), #if defined(KORE_USE_PGSQL) METHOD("register_database", python_kore_pgsql_register, METH_VARARGS), @@ -44,6 +52,90 @@ static struct PyModuleDef pykore_module = { PyModuleDef_HEAD_INIT, "kore", NULL, -1, pykore_methods }; +struct pysocket { + PyObject_HEAD + int fd; + int family; + int protocol; + PyObject *socket; + socklen_t addr_len; + union { + struct sockaddr_in ipv4; + struct sockaddr_un sun; + } addr; +}; + +static PyObject *pysocket_send(struct pysocket *, PyObject *); +static PyObject *pysocket_recv(struct pysocket *, PyObject *); +static PyObject *pysocket_close(struct pysocket *, PyObject *); +static PyObject *pysocket_accept(struct pysocket *, PyObject *); +static PyObject *pysocket_connect(struct pysocket *, PyObject *); + +static PyMethodDef pysocket_methods[] = { + METHOD("recv", pysocket_recv, METH_VARARGS), + METHOD("send", pysocket_send, METH_VARARGS), + METHOD("close", pysocket_close, METH_NOARGS), + METHOD("accept", pysocket_accept, METH_NOARGS), + METHOD("connect", pysocket_connect, METH_VARARGS), + METHOD(NULL, NULL, -1), +}; + +static void pysocket_dealloc(struct pysocket *); + +static PyTypeObject pysocket_type = { + PyVarObject_HEAD_INIT(NULL, 0) + .tp_name = "kore.socket", + .tp_doc = "kore socket implementation", + .tp_methods = pysocket_methods, + .tp_basicsize = sizeof(struct pysocket), + .tp_dealloc = (destructor)pysocket_dealloc, + .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, +}; + +#define PYSOCKET_TYPE_ACCEPT 1 +#define PYSOCKET_TYPE_CONNECT 2 +#define PYSOCKET_TYPE_RECV 3 +#define PYSOCKET_TYPE_SEND 4 + +struct pysocket_data { + struct kore_event evt; + int fd; + int type; + void *self; + void *coro; + int state; + size_t length; + struct kore_buf buffer; + struct pysocket *socket; +}; + +struct pysocket_op { + PyObject_HEAD + struct pysocket_data data; +}; + +static void pysocket_op_dealloc(struct pysocket_op *); + +static PyObject *pysocket_op_await(PyObject *); +static PyObject *pysocket_op_iternext(struct pysocket_op *); + +static PyAsyncMethods pysocket_op_async = { + (unaryfunc)pysocket_op_await, + NULL, + NULL +}; + +static PyTypeObject pysocket_op_type = { + PyVarObject_HEAD_INIT(NULL, 0) + .tp_name = "kore.socketop", + .tp_doc = "socket operation", + .tp_as_async = &pysocket_op_async, + .tp_iternext = (iternextfunc)pysocket_op_iternext, + .tp_basicsize = sizeof(struct pysocket_op), + .tp_dealloc = (destructor)pysocket_op_dealloc, + .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, +}; + struct pyconnection { PyObject_HEAD struct connection *c; diff --git a/src/bsd.c b/src/bsd.c @@ -214,6 +214,12 @@ kore_platform_disable_read(int fd) } void +kore_platform_disable_write(int fd) +{ + kore_platform_event_schedule(fd, EVFILT_WRITE, EV_DELETE, NULL); +} + +void kore_platform_proctitle(char *title) { #ifndef __MACH__ diff --git a/src/http.c b/src/http.c @@ -391,7 +391,10 @@ http_request_free(struct http_request *req) #endif #if defined(KORE_USE_PYTHON) - Py_XDECREF(req->py_coro); + if (req->py_coro != NULL) { + kore_python_coro_delete(req->py_coro); + req->py_coro = NULL; + } #endif #if defined(KORE_USE_PGSQL) while (!LIST_EMPTY(&(req->pgsqls))) { @@ -1212,7 +1215,7 @@ http_body_rewind(struct http_request *req) req->http_body_path, errno_s); return (KORE_RESULT_ERROR); } - } else { + } else if (req->http_body != NULL) { kore_buf_reset(req->http_body); } @@ -1480,10 +1483,6 @@ http_request_new(struct connection *c, const char *host, req->http_body_offset = 0; req->http_body_path = NULL; -#if defined(KORE_USE_PYTHON) - req->py_coro = NULL; -#endif - req->host = host; req->path = path; diff --git a/src/kore.c b/src/kore.c @@ -335,21 +335,30 @@ int kore_server_bind_unix(const char *path, const char *ccb) { struct listener *l; + int len; struct sockaddr_un sun; + socklen_t socklen; memset(&sun, 0, sizeof(sun)); sun.sun_family = AF_UNIX; - if (kore_strlcpy(sun.sun_path, path, sizeof(sun.sun_path)) >= - sizeof(sun.sun_path)) { + len = snprintf(sun.sun_path, sizeof(sun.sun_path), "%s", path); + if (len == -1 || (size_t)len >= sizeof(sun.sun_path)) { kore_log(LOG_ERR, "unix socket path '%s' too long", path); return (KORE_RESULT_ERROR); } +#if defined(__linux__) + if (sun.sun_path[0] == '@') + sun.sun_path[0] = '\0'; +#endif + + socklen = sizeof(sun.sun_family) + len; + if ((l = kore_listener_alloc(AF_UNIX, ccb)) == NULL) return (KORE_RESULT_ERROR); - if (bind(l->fd, (struct sockaddr *)&sun, sizeof(sun)) == -1) { + if (bind(l->fd, (struct sockaddr *)&sun, socklen) == -1) { kore_log(LOG_ERR, "bind: %s", errno_s); kore_listener_free(l); return (KORE_RESULT_ERROR); diff --git a/src/python.c b/src/python.c @@ -30,6 +30,17 @@ #include "python_api.h" #include "python_methods.h" +#define CORO_STATE_RUNNABLE 1 +#define CORO_STATE_SUSPENDED 2 + +struct python_coro { + int state; + int error; + PyObject *obj; + struct http_request *request; + TAILQ_ENTRY(python_coro) list; +}; + static PyMODINIT_FUNC python_module_init(void); static PyObject *python_import(const char *); static void python_log_error(const char *); @@ -39,13 +50,25 @@ static PyObject *python_callable(PyObject *, const char *); static PyObject *pyhttp_file_alloc(struct http_file *); static PyObject *pyhttp_request_alloc(const struct http_request *); +static struct python_coro *python_coro_create(PyObject *); +static int python_coro_run(struct python_coro *); +static void python_coro_wakeup(struct python_coro *); + +static void pysocket_evt_handle(void *, int); +static PyObject *pysocket_op_create(struct pysocket *, + int, const void *, size_t); + +static PyObject *pysocket_async_recv(struct pysocket_op *); +static PyObject *pysocket_async_send(struct pysocket_op *); +static PyObject *pysocket_async_accept(struct pysocket_op *); +static PyObject *pysocket_async_connect(struct pysocket_op *); + #if defined(KORE_USE_PGSQL) static PyObject *pykore_pgsql_alloc(struct http_request *, const char *, const char *); #endif static void python_append_path(const char *); -static int python_coroutine_run(struct http_request *); static void python_push_integer(PyObject *, const char *, long); static void python_push_type(const char *, PyObject *, PyTypeObject *); @@ -127,9 +150,26 @@ static PyMemAllocatorEx allocator = { .free = python_free }; +static struct kore_pool coro_pool; +static int coro_count; +static TAILQ_HEAD(, python_coro) coro_runnable; +static TAILQ_HEAD(, python_coro) coro_suspended; + +extern const char *__progname; + +/* XXX */ +static struct http_request *req_running = NULL; +static struct python_coro *coro_running = NULL; + void kore_python_init(void) { + coro_count = 0; + TAILQ_INIT(&coro_runnable); + TAILQ_INIT(&coro_suspended); + + kore_pool_init(&coro_pool, "coropool", sizeof(struct python_coro), 100); + PyMem_SetAllocator(PYMEM_DOMAIN_OBJ, &allocator); PyMem_SetAllocator(PYMEM_DOMAIN_MEM, &allocator); PyMem_SetAllocator(PYMEM_DOMAIN_RAW, &allocator); @@ -156,6 +196,37 @@ kore_python_path(const char *path) python_append_path(path); } +void +kore_python_coro_run(void) +{ + struct python_coro *coro, *next; + + for (coro = TAILQ_FIRST(&coro_runnable); coro != NULL; coro = next) { + next = TAILQ_NEXT(coro, list); + if (coro->state != CORO_STATE_RUNNABLE) + fatal("non-runnable coro on coro_runnable"); + if (python_coro_run(coro) == KORE_RESULT_OK) + kore_python_coro_delete(coro); + } +} + +void +kore_python_coro_delete(void *obj) +{ + struct python_coro *coro; + + coro = obj; + coro_count--; + Py_DECREF(coro->obj); + + if (coro->state == CORO_STATE_RUNNABLE) + TAILQ_REMOVE(&coro_runnable, coro, list); + else + TAILQ_REMOVE(&coro_suspended, coro, list); + + kore_pool_put(&coro_pool, coro); +} + static void * python_malloc(void *ctx, size_t len) { @@ -244,37 +315,50 @@ python_module_getsym(struct kore_module *module, const char *symbol) return (python_callable(module->handle, symbol)); } -static void -pyconnection_dealloc(struct pyconnection *pyc) +static struct python_coro * +python_coro_create(PyObject *obj) { - PyObject_Del((PyObject *)pyc); -} + struct python_coro *coro; -static void -pyhttp_dealloc(struct pyhttp_request *pyreq) -{ - Py_XDECREF(pyreq->data); - PyObject_Del((PyObject *)pyreq); -} + if (!PyCoro_CheckExact(obj)) + fatal("%s: object is not a coroutine", __func__); -static void -pyhttp_file_dealloc(struct pyhttp_file *pyfile) -{ - PyObject_Del((PyObject *)pyfile); + coro = kore_pool_get(&coro_pool); + coro_count++; + + coro->obj = obj; + coro->error = 0; + coro->request = req_running; + coro->state = CORO_STATE_RUNNABLE; + + TAILQ_INSERT_HEAD(&coro_runnable, coro, list); + + if (coro->request != NULL) + http_request_sleep(coro->request); + + return (coro); } static int -python_coroutine_run(struct http_request *req) +python_coro_run(struct python_coro *coro) { PyObject *item; + if (coro->state != CORO_STATE_RUNNABLE) + fatal("non-runnable coro attempted to run"); + + coro_running = coro; + for (;;) { PyErr_Clear(); - item = _PyGen_Send((PyGenObject *)req->py_coro, NULL); + + if (coro->error) + PyErr_SetString(PyExc_RuntimeError, "i/o error"); + + item = _PyGen_Send((PyGenObject *)coro->obj, NULL); if (item == NULL) { python_log_error("coroutine"); - Py_DECREF(req->py_coro); - req->py_coro = NULL; + coro_running = NULL; return (KORE_RESULT_OK); } @@ -286,16 +370,66 @@ python_coroutine_run(struct http_request *req) Py_DECREF(item); } + coro->state = CORO_STATE_SUSPENDED; + TAILQ_REMOVE(&coro_runnable, coro, list); + TAILQ_INSERT_HEAD(&coro_suspended, coro, list); + + coro_running = NULL; + + if (coro->request != NULL) + http_request_sleep(coro->request); + return (KORE_RESULT_RETRY); } +static void +python_coro_wakeup(struct python_coro *coro) +{ + if (coro->state != CORO_STATE_SUSPENDED) + return; + + coro->state = CORO_STATE_RUNNABLE; + TAILQ_REMOVE(&coro_suspended, coro, list); + TAILQ_INSERT_HEAD(&coro_runnable, coro, list); +} + +static void +pyconnection_dealloc(struct pyconnection *pyc) +{ + PyObject_Del((PyObject *)pyc); +} + +static void +pyhttp_dealloc(struct pyhttp_request *pyreq) +{ + Py_XDECREF(pyreq->data); + PyObject_Del((PyObject *)pyreq); +} + +static void +pyhttp_file_dealloc(struct pyhttp_file *pyfile) +{ + PyObject_Del((PyObject *)pyfile); +} + static int python_runtime_http_request(void *addr, struct http_request *req) { PyObject *pyret, *pyreq, *args, *callable; - if (req->py_coro != NULL) - return (python_coroutine_run(req)); + req_running = req; + + if (req->py_coro != NULL) { + python_coro_wakeup(req->py_coro); + if (python_coro_run(req->py_coro) == KORE_RESULT_OK) { + kore_python_coro_delete(req->py_coro); + req->py_coro = NULL; + req_running = NULL; + return (KORE_RESULT_OK); + } + req_running = NULL; + return (KORE_RESULT_RETRY); + } callable = (PyObject *)addr; @@ -319,14 +453,24 @@ python_runtime_http_request(void *addr, struct http_request *req) } if (PyCoro_CheckExact(pyret)) { - req->py_coro = pyret; - return (python_coroutine_run(req)); + http_request_sleep(req); + req->py_coro = python_coro_create(pyret); + req_running = NULL; + /* XXX merge with the above python_coro_run() block. */ + if (python_coro_run(req->py_coro) == KORE_RESULT_OK) { + kore_python_coro_delete(req->py_coro); + req->py_coro = NULL; + req_running = NULL; + return (KORE_RESULT_OK); + } + return (KORE_RESULT_RETRY); } if (pyret != Py_None) fatal("python_runtime_http_request: unexpected return type"); Py_DECREF(pyret); + req_running = NULL; return (KORE_RESULT_OK); } @@ -448,24 +592,37 @@ static void python_runtime_configure(void *addr, int argc, char **argv) { int i; - PyObject *callable, *args, *pyret, *pyarg; + PyObject *callable, *args, *pyret, *pyarg, *list; callable = (PyObject *)addr; - if ((args = PyTuple_New(argc)) == NULL) + if ((args = PyTuple_New(1)) == NULL) fatal("python_runtime_configure: PyTuple_New failed"); + if ((list = PyList_New(argc + 1)) == NULL) + fatal("python_runtime_configure: PyList_New failed"); + + if ((pyarg = PyUnicode_FromString(__progname)) == NULL) + fatal("python_runtime_configure: PyUnicode_FromString"); + + if (PyList_SetItem(list, 0, pyarg) == -1) + fatal("python_runtime_configure: PyList_SetItem"); + for (i = 0; i < argc; i++) { if ((pyarg = PyUnicode_FromString(argv[i])) == NULL) fatal("python_runtime_configure: PyUnicode_FromString"); - if (PyTuple_SetItem(args, i, pyarg) != 0) - fatal("python_runtime_configure: PyTuple_SetItem"); + if (PyList_SetItem(list, i + 1, pyarg) == -1) + fatal("python_runtime_configure: PyList_SetItem"); } + if (PyTuple_SetItem(args, 0, list) != 0) + fatal("python_runtime_configure: PyTuple_SetItem"); + PyErr_Clear(); pyret = PyObject_Call(callable, args, NULL); Py_DECREF(args); + Py_DECREF(list); if (pyret == NULL) { python_log_error("python_runtime_configure"); @@ -547,6 +704,8 @@ python_module_init(void) if ((pykore = PyModule_Create(&pykore_module)) == NULL) fatal("python_module_init: failed to setup pykore module"); + python_push_type("pysocket", pykore, &pysocket_type); + python_push_type("pysocket_op", pykore, &pysocket_op_type); python_push_type("pyconnection", pykore, &pyconnection_type); for (i = 0; python_integers[i].symbol != NULL; i++) { @@ -626,7 +785,7 @@ python_kore_log(PyObject *self, PyObject *args) } static PyObject * -python_kore_listen(PyObject *self, PyObject *args) +python_kore_bind(PyObject *self, PyObject *args) { const char *ip, *port; @@ -642,6 +801,93 @@ python_kore_listen(PyObject *self, PyObject *args) } static PyObject * +python_kore_bind_unix(PyObject *self, PyObject *args) +{ + const char *path; + + if (!PyArg_ParseTuple(args, "s", &path)) + return (NULL); + + if (!kore_server_bind_unix(path, NULL)) { + PyErr_SetString(PyExc_RuntimeError, "failed bind to path"); + return (NULL); + } + + Py_RETURN_TRUE; +} + +static PyObject * +python_kore_task_create(PyObject *self, PyObject *args) +{ + PyObject *obj; + + if (!PyArg_ParseTuple(args, "O", &obj)) + return (NULL); + + if (!PyCoro_CheckExact(obj)) + fatal("%s: object is not a coroutine", __func__); + + python_coro_create(obj); + Py_INCREF(obj); + + Py_RETURN_NONE; +} + +static PyObject * +python_kore_socket_wrap(PyObject *self, PyObject *args) +{ + struct pysocket *sock; + PyObject *pysock, *pyfd, *pyfam, *pyproto; + + sock = NULL; + pyfd = NULL; + pyfam = NULL; + pyproto = NULL; + + if (!PyArg_ParseTuple(args, "O", &pysock)) + return (NULL); + + if ((pyfd = PyObject_CallMethod(pysock, "fileno", NULL)) == NULL) + return (NULL); + + if ((pyfam = PyObject_GetAttrString(pysock, "family")) == NULL) + goto out; + + if ((pyproto = PyObject_GetAttrString(pysock, "proto")) == NULL) + goto out; + + if ((sock = PyObject_New(struct pysocket, &pysocket_type)) == NULL) + goto out; + + sock->socket = pysock; + Py_INCREF(sock->socket); + + sock->fd = (int)PyLong_AsLong(pyfd); + sock->family = (int)PyLong_AsLong(pyfam); + sock->protocol = (int)PyLong_AsLong(pyproto); + + memset(&sock->addr, 0, sizeof(sock->addr)); + + switch (sock->family) { + case AF_INET: + case AF_UNIX: + break; + default: + PyErr_SetString(PyExc_RuntimeError, "unsupported family"); + Py_DECREF((PyObject *)sock); + sock = NULL; + goto out; + } + +out: + Py_XDECREF(pyfd); + Py_XDECREF(pyfam); + Py_XDECREF(pyproto); + + return ((PyObject *)sock); +} + +static PyObject * python_kore_fatal(PyObject *self, PyObject *args) { const char *reason; @@ -656,6 +902,20 @@ python_kore_fatal(PyObject *self, PyObject *args) } static PyObject * +python_kore_fatalx(PyObject *self, PyObject *args) +{ + const char *reason; + + if (!PyArg_ParseTuple(args, "s", &reason)) + reason = "python_kore_fatalx: PyArg_ParseTuple failed"; + + fatalx("%s", reason); + + /* not reached */ + Py_RETURN_TRUE; +} + +static PyObject * python_import(const char *path) { PyObject *module; @@ -760,6 +1020,369 @@ pyconnection_get_addr(struct pyconnection *pyc, void *closure) return (result); } +static void +pysocket_dealloc(struct pysocket *sock) +{ + PyObject_Del((PyObject *)sock); +} + +static PyObject * +pysocket_send(struct pysocket *sock, PyObject *args) +{ + Py_buffer buf; + + if (!PyArg_ParseTuple(args, "y*", &buf)) + return (NULL); + + return (pysocket_op_create(sock, PYSOCKET_TYPE_SEND, buf.buf, buf.len)); +} + +static PyObject * +pysocket_recv(struct pysocket *sock, PyObject *args) +{ + Py_ssize_t len; + + if (!PyArg_ParseTuple(args, "n", &len)) + return (NULL); + + return (pysocket_op_create(sock, PYSOCKET_TYPE_RECV, NULL, len)); +} + +static PyObject * +pysocket_accept(struct pysocket *sock, PyObject *args) +{ + return (pysocket_op_create(sock, PYSOCKET_TYPE_ACCEPT, NULL, 0)); +} + +static PyObject * +pysocket_connect(struct pysocket *sock, PyObject *args) +{ + const char *host; + int port, len; + + port = 0; + + if (!PyArg_ParseTuple(args, "s|i", &host, &port)) + return (NULL); + + if (port < 0 || port > USHRT_MAX) { + PyErr_SetString(PyExc_RuntimeError, "invalid port number"); + return (NULL); + } + + switch (sock->family) { + case AF_INET: + sock->addr.ipv4.sin_family = AF_INET; + sock->addr.ipv4.sin_port = htons(port); + if (inet_pton(sock->family, host, + &sock->addr.ipv4.sin_addr) == -1) { + PyErr_SetString(PyExc_RuntimeError, "invalid host"); + return (NULL); + } + sock->addr_len = sizeof(sock->addr.ipv4); + break; + case AF_UNIX: + sock->addr.sun.sun_family = AF_UNIX; + len = snprintf(sock->addr.sun.sun_path, + sizeof(sock->addr.sun.sun_path), "%s", host); + if (len == -1 || + (size_t)len >= sizeof(sock->addr.sun.sun_path)) { + PyErr_SetString(PyExc_RuntimeError, "path too long"); + return (NULL); + } +#if defined(__linux__) + /* Assume abstract socket if prefixed with '@'. */ + if (sock->addr.sun.sun_path[0] == '@') + sock->addr.sun.sun_path[0] = '\0'; +#endif + sock->addr_len = sizeof(sock->addr.sun.sun_family) + len; + break; + default: + fatal("unsupported socket family %d", sock->family); + } + + return (pysocket_op_create(sock, PYSOCKET_TYPE_CONNECT, NULL, 0)); +} + +static PyObject * +pysocket_close(struct pysocket *sock, PyObject *args) +{ + if (sock->socket != NULL) { + (void)PyObject_CallMethod(sock->socket, "close", NULL); + Py_DECREF(sock->socket); + } else if (sock->fd != -1) { + (void)close(sock->fd); + } + + Py_RETURN_TRUE; +} + +static void +pysocket_op_dealloc(struct pysocket_op *op) +{ +#if defined(__linux__) + kore_platform_disable_read(op->data.fd); + close(op->data.fd); +#else + switch (op->data.type) { + case PYSOCKET_TYPE_RECV: + case PYSOCKET_TYPE_ACCEPT: + kore_platform_disable_read(op->data.fd); + break; + case PYSOCKET_TYPE_SEND: + case PYSOCKET_TYPE_CONNECT: + kore_platform_disable_write(op->data.fd); + break; + default: + fatal("unknown pysocket_op type %u", op->data.type); + } +#endif + + if (op->data.type == PYSOCKET_TYPE_RECV || + op->data.type == PYSOCKET_TYPE_SEND) + kore_buf_cleanup(&op->data.buffer); + + Py_DECREF(op->data.socket); + PyObject_Del((PyObject *)op); +} + +static PyObject * +pysocket_op_create(struct pysocket *sock, int type, const void *ptr, size_t len) +{ + struct pysocket_op *op; + + op = PyObject_New(struct pysocket_op, &pysocket_op_type); + if (op == NULL) + return (NULL); + +#if defined(__linux__) + /* + * Duplicate the socket so each pysocket_op gets its own unique + * descriptor for epoll. This is so we can easily call EPOLL_CTL_DEL + * on the fd when the pysocket_op is finished as our event code + * does not track queued events. + */ + if ((op->data.fd = dup(sock->fd)) == -1) + fatal("dup: %s", errno_s); +#else + op->data.fd = sock->fd; +#endif + + op->data.self = op; + op->data.type = type; + op->data.socket = sock; + op->data.evt.flags = 0; + op->data.coro = coro_running; + op->data.evt.type = KORE_TYPE_PYSOCKET; + op->data.evt.handle = pysocket_evt_handle; + + Py_INCREF(op->data.socket); + + switch (type) { + case PYSOCKET_TYPE_RECV: + op->data.evt.flags |= KORE_EVENT_READ; + kore_buf_init(&op->data.buffer, len); + kore_platform_schedule_read(op->data.fd, &op->data); + break; + case PYSOCKET_TYPE_SEND: + op->data.evt.flags |= KORE_EVENT_WRITE; + kore_buf_init(&op->data.buffer, len); + kore_buf_append(&op->data.buffer, ptr, len); + kore_buf_reset(&op->data.buffer); + kore_platform_schedule_write(op->data.fd, &op->data); + break; + case PYSOCKET_TYPE_ACCEPT: + op->data.evt.flags |= KORE_EVENT_READ; + kore_platform_schedule_read(op->data.fd, &op->data); + break; + case PYSOCKET_TYPE_CONNECT: + op->data.evt.flags |= KORE_EVENT_WRITE; + kore_platform_schedule_write(op->data.fd, &op->data); + break; + default: + fatal("unknown pysocket_op type %u", type); + } + + return ((PyObject *)op); +} + +static PyObject * +pysocket_op_await(PyObject *obj) +{ + Py_INCREF(obj); + return (obj); +} + +static PyObject * +pysocket_op_iternext(struct pysocket_op *op) +{ + PyObject *ret; + + switch (op->data.type) { + case PYSOCKET_TYPE_CONNECT: + ret = pysocket_async_connect(op); + break; + case PYSOCKET_TYPE_ACCEPT: + ret = pysocket_async_accept(op); + break; + case PYSOCKET_TYPE_RECV: + ret = pysocket_async_recv(op); + break; + case PYSOCKET_TYPE_SEND: + ret = pysocket_async_send(op); + break; + default: + PyErr_SetString(PyExc_RuntimeError, "invalid op type"); + return (NULL); + } + + return (ret); +} + +static PyObject * +pysocket_async_connect(struct pysocket_op *op) +{ + if (connect(op->data.fd, (struct sockaddr *)&op->data.socket->addr, + op->data.socket->addr_len) == -1) { + if (errno != EALREADY && errno != EINPROGRESS && + errno != EISCONN) { + PyErr_SetString(PyExc_RuntimeError, errno_s); + return (NULL); + } + + if (errno != EISCONN) { + Py_RETURN_NONE; + } + } + + PyErr_SetNone(PyExc_StopIteration); + return (NULL); +} + +static PyObject * +pysocket_async_accept(struct pysocket_op *op) +{ + int fd; + struct pysocket *sock; + + if ((sock = PyObject_New(struct pysocket, &pysocket_type)) == NULL) + return (NULL); + + sock->addr_len = sizeof(sock->addr); + + if ((fd = accept(op->data.fd, + (struct sockaddr *)&sock->addr, &sock->addr_len)) == -1) { + Py_DECREF((PyObject *)sock); + if (errno == EAGAIN || errno == EWOULDBLOCK) { + Py_RETURN_NONE; + } + PyErr_SetString(PyExc_RuntimeError, errno_s); + return (NULL); + } + + if (!kore_connection_nonblock(fd, 0)) { + Py_DECREF((PyObject *)sock); + PyErr_SetString(PyExc_RuntimeError, errno_s); + return (NULL); + } + + sock->fd = fd; + sock->socket = NULL; + sock->family = op->data.socket->family; + sock->protocol = op->data.socket->protocol; + + PyErr_SetObject(PyExc_StopIteration, (PyObject *)sock); + Py_DECREF((PyObject *)sock); + + return (NULL); +} + +static PyObject * +pysocket_async_recv(struct pysocket_op *op) +{ + ssize_t ret; + const char *ptr; + PyObject *bytes; + + if (!(op->data.evt.flags & KORE_EVENT_READ)) { + Py_RETURN_NONE; + } + + ret = read(op->data.fd, op->data.buffer.data, op->data.buffer.length); + if (ret == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + op->data.evt.flags &= ~KORE_EVENT_READ; + Py_RETURN_NONE; + } + PyErr_SetString(PyExc_RuntimeError, errno_s); + return (NULL); + } + + if (ret == 0) { + PyErr_SetNone(PyExc_StopIteration); + return (NULL); + } + + ptr = (const char *)op->data.buffer.data; + + bytes = PyBytes_FromStringAndSize(ptr, ret); + if (bytes != NULL) + PyErr_SetObject(PyExc_StopIteration, bytes); + + return (NULL); +} + +static PyObject * +pysocket_async_send(struct pysocket_op *op) +{ + ssize_t ret; + + if (!(op->data.evt.flags & KORE_EVENT_WRITE)) { + Py_RETURN_NONE; + } + + ret = write(op->data.fd, op->data.buffer.data + op->data.buffer.offset, + op->data.buffer.length - op->data.buffer.offset); + if (ret == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + op->data.evt.flags &= ~KORE_EVENT_WRITE; + Py_RETURN_NONE; + } + PyErr_SetString(PyExc_RuntimeError, errno_s); + return (NULL); + } + + op->data.buffer.offset += (size_t)ret; + + if (op->data.buffer.offset == op->data.buffer.length) { + PyErr_SetNone(PyExc_StopIteration); + return (NULL); + } + + Py_RETURN_NONE; +} + +static void +pysocket_evt_handle(void *arg, int error) +{ + struct pysocket_data *data = arg; + struct python_coro *coro = data->coro; + + /* + * If we are a coroutine tied to an HTTP request wake-up the + * HTTP request instead. That in turn will wakeup the coro and + * continue it. + * + * Otherwise just wakeup the coroutine so it will run next tick. + */ + if (coro->request != NULL) + http_request_wakeup(coro->request); + else + python_coro_wakeup(coro); + + coro->error = error; +} + static PyObject * pyhttp_request_alloc(const struct http_request *req) { @@ -860,10 +1483,8 @@ pyhttp_body_read(struct pyhttp_request *pyreq, PyObject *args) PyObject *result; u_int8_t buf[1024]; - if (!PyArg_ParseTuple(args, "n", &pylen) || pylen < 0) { - PyErr_SetString(PyExc_TypeError, "invalid parameters"); + if (!PyArg_ParseTuple(args, "n", &pylen) || pylen < 0) return (NULL); - } len = (size_t)pylen; if (len > sizeof(buf)) { @@ -986,10 +1607,8 @@ pyhttp_file_read(struct pyhttp_file *pyfile, PyObject *args) PyObject *result; u_int8_t buf[1024]; - if (!PyArg_ParseTuple(args, "n", &pylen) || pylen < 0) { - PyErr_SetString(PyExc_TypeError, "invalid parameters"); + if (!PyArg_ParseTuple(args, "n", &pylen) || pylen < 0) return (NULL); - } len = (size_t)pylen; if (len > sizeof(buf)) { @@ -1095,10 +1714,8 @@ python_websocket_broadcast(PyObject *self, PyObject *args) if (pysrc == Py_None) { c = NULL; } else { - if (!PyObject_TypeCheck(pysrc, &pyconnection_type)) { - PyErr_SetString(PyExc_TypeError, "invalid parameters"); + if (!PyObject_TypeCheck(pysrc, &pyconnection_type)) return (NULL); - } pyc = (struct pyconnection *)pysrc; c = pyc->c; } diff --git a/src/worker.c b/src/worker.c @@ -448,6 +448,9 @@ kore_worker_entry(struct kore_worker *kw) #if !defined(KORE_NO_HTTP) http_process(); #endif +#if defined(KORE_USE_PYTHON) + kore_python_coro_run(); +#endif if (next_prune <= now) { kore_connection_check_timeout(now);