1 module nngd.nngd; 2 3 import core.memory; 4 import core.time; 5 import core.stdc.string; 6 import core.stdc.stdlib; 7 import std.conv; 8 import std.string; 9 import std.typecons; 10 import std.algorithm; 11 import std.datetime.systime; 12 import std.traits; 13 import std.json; 14 import std.file; 15 import std.path; 16 import std.exception; 17 import std.array; 18 import std.utf; 19 20 private import libnng; 21 22 import std.stdio; 23 24 @safe 25 T* ptr(T)(T[] arr, size_t off = 0) pure nothrow { 26 return arr.length == 0 ? null : &arr[off]; 27 } 28 29 alias nng_errno = libnng.nng_errno; 30 alias nng_errstr = libnng.nng_errstr; 31 alias toString = nng_errstr; 32 33 @safe 34 void nng_sleep(Duration val) nothrow { 35 nng_msleep(cast(nng_duration) val.total!"msecs"); 36 } 37 38 string toString(nng_sockaddr a) { 39 string s = "<ADDR:UNKNOWN>"; 40 switch (a.s_family) { 41 case nng_sockaddr_family.NNG_AF_NONE: 42 s = format("<ADDR:NONE>"); 43 break; 44 case nng_sockaddr_family.NNG_AF_UNSPEC: 45 s = format("<ADDR:UNSPEC>"); 46 break; 47 case nng_sockaddr_family.NNG_AF_INPROC: 48 s = format("<ADDR:INPROC name: %s >", a.s_inproc.sa_name); 49 break; 50 case nng_sockaddr_family.NNG_AF_IPC: 51 s = format("<ADDR:IPC path: %s >", a.s_ipc.sa_path); 52 break; 53 case nng_sockaddr_family.NNG_AF_INET: 54 s = format("<ADDR:INET addr: %u port: %u >", a.s_in.sa_addr, a.s_in.sa_port); 55 break; 56 case nng_sockaddr_family.NNG_AF_INET6: 57 s = format("<ADDR:INET6 scope: %u addr: %s port: %u >", a.s_in6.sa_scope, a.s_in6.sa_addr, a.s_in6.sa_port); 58 break; 59 case nng_sockaddr_family.NNG_AF_ZT: 60 s = format("<ADDR:ZT nwid: %u nodeid: %u port: %u >", a.s_zt.sa_nwid, a.s_zt.sa_nodeid, a.s_zt.sa_port); 61 break; 62 case nng_sockaddr_family.NNG_AF_ABSTRACT: 63 s = format("<ADDR:ABSTRACT name: %s >", cast(string) a.s_abstract.sa_name[0 .. a.s_abstract.sa_len]); 64 break; 65 default: 66 break; 67 } 68 return s; 69 } 70 71 enum infiniteDuration = Duration.max; 72 73 enum nng_socket_type { 74 NNG_SOCKET_BUS, 75 NNG_SOCKET_PAIR, 76 NNG_SOCKET_PULL, 77 NNG_SOCKET_PUSH, 78 NNG_SOCKET_PUB, 79 NNG_SOCKET_SUB, 80 NNG_SOCKET_REQ, 81 NNG_SOCKET_REP, 82 NNG_SOCKET_SURVEYOR, 83 NNG_SOCKET_RESPONDENT 84 }; 85 86 enum nng_socket_state { 87 NNG_STATE_NONE = 0, 88 NNG_STATE_CREATED = 1, 89 NNG_STATE_PREPARED = 2, 90 NNG_STATE_CONNECTED = 4, 91 NNG_STATE_ERROR = 16 92 } 93 94 enum nng_property_base { 95 NNG_BASE_SOCKET, 96 NNG_BASE_DIALER, 97 NNG_BASE_LISTENER 98 } 99 100 struct NNGMessage { 101 nng_msg* msg; 102 103 @disable this(); 104 105 this(ref return scope NNGMessage src) { 106 auto rc = nng_msg_dup(&msg, src.pointer); 107 enforce(rc == 0); 108 } 109 110 this(nng_msg* msgref) { 111 if (msgref is null) { 112 auto rc = nng_msg_alloc(&msg, 0); 113 enforce(rc == 0); 114 } 115 else { 116 msg = msgref; 117 } 118 } 119 120 this(size_t size) { 121 auto rc = nng_msg_alloc(&msg, size); 122 enforce(rc == 0); 123 } 124 125 ~this() { 126 nng_msg_free(msg); 127 } 128 129 @nogc @safe 130 @property nng_msg* pointer() nothrow { 131 return msg; 132 } 133 134 @nogc 135 @property void pointer(nng_msg* p) nothrow { 136 if (p !is null) { 137 msg = p; 138 } 139 else { 140 nng_msg_clear(msg); 141 } 142 } 143 144 @nogc @safe 145 @property void* bodyptr() nothrow { 146 return nng_msg_body(msg); 147 } 148 149 @nogc @safe 150 @property void* headerptr() nothrow { 151 return nng_msg_header(msg); 152 } 153 154 @property size_t length() { 155 return nng_msg_len(msg); 156 } 157 158 @property void length(size_t sz) { 159 auto rc = nng_msg_realloc(msg, sz); 160 enforce(rc == 0); 161 } 162 163 @property size_t header_length() { 164 return nng_msg_header_len(msg); 165 } 166 167 void clear() { 168 nng_msg_clear(msg); 169 } 170 171 int body_append(T)(const(T) data) if (isArray!T || isUnsigned!T) { 172 static if (isArray!T) { 173 static assert((ForeachType!T).sizeof == 1, "None byte size array element are not supported"); 174 auto rc = nng_msg_append(msg, ptr(data), data.length); 175 enforce(rc == 0); 176 return 0; 177 } 178 else { 179 static if (T.sizeof == 1) { 180 T tmp = data; 181 auto rc = nng_msg_append(msg, cast(void*)&tmp, 1); 182 enforce(rc == 0); 183 } 184 static if (T.sizeof == 2) { 185 auto rc = nng_msg_append_u16(msg, data); 186 enforce(rc == 0); 187 } 188 static if (T.sizeof == 4) { 189 auto rc = nng_msg_append_u32(msg, data); 190 enforce(rc == 0); 191 } 192 static if (T.sizeof == 8) { 193 auto rc = nng_msg_append_u64(msg, data); 194 enforce(rc == 0); 195 } 196 return 0; 197 } 198 } 199 200 int body_prepend(T)(const(T) data) if (isArray!T || isUnsigned!T) { 201 static if (isArray!T) { 202 static assert((ForeachType!T).sizeof == 1, "None byte size array element are not supported"); 203 auto rc = nng_msg_insert(msg, ptr(data), data.length); 204 enforce(rc == 0); 205 return 0; 206 } 207 else { 208 static if (T.sizeof == 1) { 209 T tmp = data; 210 auto rc = nng_msg_insert(msg, cast(void*)&tmp, 1); 211 enforce(rc == 0); 212 } 213 static if (T.sizeof == 2) { 214 auto rc = nng_msg_insert_u16(msg, data); 215 enforce(rc == 0); 216 } 217 static if (T.sizeof == 4) { 218 auto rc = nng_msg_insert_u32(msg, data); 219 enforce(rc == 0); 220 } 221 static if (T.sizeof == 8) { 222 auto rc = nng_msg_insert_u64(msg, data); 223 enforce(rc == 0); 224 } 225 return 0; 226 } 227 } 228 229 T body_chop(T)(size_t size = 0) if (isArray!T || isUnsigned!T) { 230 static if (isArray!T) { 231 if (size == 0) 232 size = length; 233 if (size == 0) 234 return []; 235 T data = cast(T)(bodyptr + (length - size))[0 .. size]; 236 auto rc = nng_msg_chop(msg, size); 237 enforce(rc == 0); 238 return data; 239 } 240 else { 241 T tmp; 242 static if (T.sizeof == 1) { 243 tmp = cast(T)*(bodyptr + (length - 1)); 244 auto rc = nng_msg_chop(msg, 1); 245 enforce(rc == 0); 246 } 247 static if (T.sizeof == 2) { 248 auto rc = nng_msg_chop_u16(msg, &tmp); 249 enforce(rc == 0); 250 } 251 static if (T.sizeof == 4) { 252 auto rc = nng_msg_chop_u32(msg, &tmp); 253 enforce(rc == 0); 254 } 255 static if (T.sizeof == 8) { 256 auto rc = nng_msg_chop_u64(msg, &tmp); 257 enforce(rc == 0); 258 } 259 return tmp; 260 } 261 } 262 263 T body_trim(T)(size_t size = 0) if (isArray!T || isUnsigned!T) { 264 static if (isArray!T) { 265 if (size == 0) 266 size = length; 267 if (size == 0) 268 return []; 269 T data = cast(T)(bodyptr)[0 .. size]; 270 auto rc = nng_msg_trim(msg, size); 271 enforce(rc == 0); 272 return data; 273 } 274 else { 275 T tmp; 276 static if (T.sizeof == 1) { 277 tmp = cast(T)*(bodyptr); 278 auto rc = nng_msg_trim(msg, 1); 279 enforce(rc == 0); 280 } 281 static if (T.sizeof == 2) { 282 auto rc = nng_msg_trim_u16(msg, &tmp); 283 enforce(rc == 0); 284 } 285 static if (T.sizeof == 4) { 286 auto rc = nng_msg_trim_u32(msg, &tmp); 287 enforce(rc == 0); 288 } 289 static if (T.sizeof == 8) { 290 auto rc = nng_msg_trim_u64(msg, &tmp); 291 enforce(rc == 0); 292 } 293 return tmp; 294 } 295 } 296 297 // TODO: body structure map 298 299 int header_append(T)(const(T) data) if (isArray!T || isUnsigned!T) { 300 static if (isArray!T) { 301 static assert((ForeachType!T).sizeof == 1, "None byte size array element are not supported"); 302 auto rc = nng_msg_header_append(msg, ptr(data), data.length); 303 return 0; 304 } 305 else { 306 static if (T.sizeof == 1) { 307 T tmp = data; 308 auto rc = nng_msg_header_append(msg, cast(void*)&tmp, 1); 309 enforce(rc == 0); 310 } 311 static if (T.sizeof == 2) { 312 auto rc = nng_msg_header_append_u16(msg, data); 313 enforce(rc == 0); 314 } 315 static if (T.sizeof == 4) { 316 auto rc = nng_msg_header_append_u32(msg, data); 317 enforce(rc == 0); 318 } 319 static if (T.sizeof == 8) { 320 auto rc = nng_msg_header_append_u64(msg, data); 321 enforce(rc == 0); 322 } 323 return 0; 324 } 325 } 326 327 int header_prepend(T)(const(T) data) if (isArray!T || isUnsigned!T) { 328 static if (isArray!T) { 329 static assert((ForeachType!T).sizeof == 1, "None byte size array element are not supported"); 330 auto rc = nng_msg_header_insert(msg, ptr(data), data.length); 331 enforce(rc == 0); 332 return 0; 333 } 334 else { 335 static if (T.sizeof == 1) { 336 T tmp = data; 337 auto rc = nng_msg_header_insert(msg, cast(void*)&tmp, 1); 338 enforce(rc == 0); 339 } 340 static if (T.sizeof == 2) { 341 auto rc = nng_msg_header_insert_u16(msg, data); 342 enforce(rc == 0); 343 } 344 static if (T.sizeof == 4) { 345 auto rc = nng_msg_header_insert_u32(msg, data); 346 enforce(rc == 0); 347 } 348 static if (T.sizeof == 8) { 349 auto rc = nng_msg_header_insert_u64(msg, data); 350 enforce(rc == 0); 351 } 352 return 0; 353 } 354 } 355 356 T header_chop(T)(size_t size = 0) if (isArray!T || isUnsigned!T) { 357 static if (isArray!T) { 358 if (size == 0) 359 size = header_length; 360 if (size == 0) 361 return []; 362 T data = cast(T)(headerptr + (header_length - size))[0 .. size]; 363 auto rc = nng_msg_header_chop(msg, size); 364 enforce(rc == 0); 365 return data; 366 } 367 else { 368 T tmp; 369 static if (T.sizeof == 1) { 370 tmp = cast(T)*(bodyptr + (length - 1)); 371 auto rc = nng_msg_header_chop(msg, 1); 372 enforce(rc == 0); 373 } 374 static if (T.sizeof == 2) { 375 auto rc = nng_msg_header_chop_u16(msg, &tmp); 376 enforce(rc == 0); 377 } 378 static if (T.sizeof == 4) { 379 auto rc = nng_msg_header_chop_u32(msg, &tmp); 380 enforce(rc == 0); 381 } 382 static if (T.sizeof == 8) { 383 auto rc = nng_msg_header_chop_u64(msg, &tmp); 384 enforce(rc == 0); 385 } 386 return tmp; 387 } 388 } 389 390 T header_trim(T)(size_t size = 0) if (isArray!T || isUnsigned!T) { 391 static if (isArray!T) { 392 if (size == 0) 393 size = header_length; 394 if (size == 0) 395 return []; 396 T data = cast(T)(headerptr)[0 .. size]; 397 auto rc = nng_msg_header_trim(msg, size); 398 enforce(rc == 0); 399 return data; 400 } 401 else { 402 T tmp; 403 static if (T.sizeof == 1) { 404 tmp = cast(T)*(bodyptr); 405 auto rc = nng_msg_header_trim(msg, 1); 406 enforce(rc == 0); 407 } 408 static if (T.sizeof == 2) { 409 auto rc = nng_msg_header_trim_u16(msg, &tmp); 410 enforce(rc == 0); 411 } 412 static if (T.sizeof == 4) { 413 auto rc = nng_msg_header_trim_u32(msg, &tmp); 414 enforce(rc == 0); 415 } 416 static if (T.sizeof == 8) { 417 auto rc = nng_msg_header_trim_u64(msg, &tmp); 418 enforce(rc == 0); 419 } 420 return tmp; 421 } 422 } 423 } // struct NNGMessage 424 425 alias nng_aio_cb = void function(void*); 426 427 struct NNGAio { 428 nng_aio* aio; 429 430 @disable this(); 431 432 this(nng_aio_cb cb, void* arg) { 433 auto rc = nng_aio_alloc(&aio, cb, arg); 434 enforce(rc == 0); 435 } 436 437 this(nng_aio* src) { 438 enforce(src !is null); 439 pointer(src); 440 } 441 442 ~this() { 443 nng_aio_free(aio); 444 } 445 446 void realloc(nng_aio_cb cb, void* arg) { 447 nng_aio_free(aio); 448 auto rc = nng_aio_alloc(&aio, cb, arg); 449 enforce(rc == 0); 450 } 451 452 // ---------- pointer prop 453 454 @nogc @safe 455 @property nng_aio* pointer() { 456 return aio; 457 } 458 459 @nogc 460 @property void pointer(nng_aio* p) { 461 if (p !is null) { 462 nng_aio_free(aio); 463 aio = p; 464 } 465 else { 466 nng_aio_free(aio); 467 nng_aio_alloc(&aio, null, null); 468 } 469 } 470 471 // ---------- status prop 472 473 @nogc @safe 474 @property size_t count() nothrow { 475 return nng_aio_count(aio); 476 } 477 478 @nogc @safe 479 @property nng_errno result() nothrow { 480 return cast(nng_errno) nng_aio_result(aio); 481 } 482 483 @nogc @safe 484 @property void timeout(Duration val) nothrow { 485 nng_aio_set_timeout(aio, cast(nng_duration) val.total!"msecs"); 486 } 487 488 // ---------- controls 489 490 bool begin() { 491 return nng_aio_begin(aio); 492 } 493 494 void wait() { 495 nng_aio_wait(aio); 496 } 497 498 void sleep(Duration val) { 499 nng_sleep_aio(cast(nng_duration) val.total!"msecs", aio); 500 } 501 502 /* 503 = no callback 504 */ 505 void abort(nng_errno err) { 506 nng_aio_abort(aio, cast(int) err); 507 } 508 509 /* 510 = callback 511 */ 512 void finish(nng_errno err) { 513 nng_aio_finish(aio, cast(int) err); 514 } 515 516 alias nng_aio_ccb = void function(nng_aio*, void*, int); 517 void defer(nng_aio_ccb cancelcb, void* arg) { 518 nng_aio_defer(aio, cancelcb, arg); 519 } 520 521 /* 522 = abort(NNG_CANCELLED) 523 = no callback 524 = no wait for abort and callback complete 525 */ 526 void cancel() { 527 nng_aio_cancel(aio); 528 } 529 530 /* 531 = abort(NNG_CANCELLED) 532 = no callback 533 = wait for abort and callback complete 534 */ 535 void stop() { 536 nng_aio_stop(aio); 537 } 538 539 // ---------- messages 540 541 nng_errno get_msg(ref NNGMessage msg) { 542 auto err = this.result(); 543 if (err != nng_errno.NNG_OK) 544 return err; 545 nng_msg* m = nng_aio_get_msg(aio); 546 if (m is null) { 547 return nng_errno.NNG_EINTERNAL; 548 } 549 else { 550 msg.pointer(m); 551 return nng_errno.NNG_OK; 552 } 553 } 554 555 void set_msg(ref NNGMessage msg) { 556 nng_aio_set_msg(aio, msg.pointer); 557 } 558 559 void clear_msg() { 560 nng_aio_set_msg(aio, null); 561 } 562 563 // TODO: IOV and context input-output parameters 564 } // struct NNGAio 565 566 struct NNGSocket { 567 nng_socket_type m_type; 568 nng_socket_state m_state; 569 nng_socket m_socket; 570 nng_ctx[] m_ctx; 571 string[] m_subscriptions; 572 string m_name; 573 nng_errno m_errno; 574 575 bool m_raw; 576 bool m_may_send, m_may_recv; 577 578 nng_listener m_listener; 579 nng_dialer m_dialer; 580 581 @disable this(); 582 583 this(nng_socket_type itype, bool iraw = false) nothrow { 584 int rc; 585 m_type = itype; 586 m_raw = iraw; 587 m_state = nng_socket_state.NNG_STATE_NONE; 588 with (nng_socket_type) { 589 final switch (itype) { 590 case NNG_SOCKET_BUS: 591 rc = (!raw) ? nng_bus_open(&m_socket) : nng_bus_open_raw(&m_socket); 592 m_may_send = true; 593 m_may_recv = true; 594 break; 595 case NNG_SOCKET_PAIR: 596 rc = (!raw) ? nng_pair_open(&m_socket) : nng_pair_open_raw(&m_socket); 597 m_may_send = true; 598 m_may_recv = true; 599 break; 600 case NNG_SOCKET_PULL: 601 rc = (!raw) ? nng_pull_open(&m_socket) : nng_pull_open_raw(&m_socket); 602 m_may_send = false; 603 m_may_recv = true; 604 break; 605 case NNG_SOCKET_PUSH: 606 rc = (!raw) ? nng_push_open(&m_socket) : nng_push_open_raw(&m_socket); 607 m_may_send = true; 608 m_may_recv = false; 609 break; 610 case NNG_SOCKET_PUB: 611 rc = (!raw) ? nng_pub_open(&m_socket) : nng_pub_open_raw(&m_socket); 612 m_may_send = true; 613 m_may_recv = false; 614 break; 615 case NNG_SOCKET_SUB: 616 rc = (!raw) ? nng_sub_open(&m_socket) : nng_sub_open_raw(&m_socket); 617 m_may_send = false; 618 m_may_recv = true; 619 break; 620 case NNG_SOCKET_REQ: 621 rc = (!raw) ? nng_req_open(&m_socket) : nng_req_open_raw(&m_socket); 622 m_may_send = true; 623 m_may_recv = true; 624 break; 625 case NNG_SOCKET_REP: 626 rc = (!raw) ? nng_rep_open(&m_socket) : nng_rep_open_raw(&m_socket); 627 m_may_send = true; 628 m_may_recv = true; 629 break; 630 case NNG_SOCKET_SURVEYOR: 631 rc = (!raw) ? nng_surveyor_open(&m_socket) : nng_surveyor_open_raw(&m_socket); 632 m_may_send = true; 633 m_may_recv = true; 634 break; 635 case NNG_SOCKET_RESPONDENT: 636 rc = (!raw) ? nng_respondent_open(&m_socket) : nng_respondent_open_raw(&m_socket); 637 m_may_send = true; 638 m_may_recv = true; 639 break; 640 } 641 } 642 if (rc != 0) { 643 m_state = nng_socket_state.NNG_STATE_ERROR; 644 m_errno = cast(nng_errno) rc; 645 } 646 else { 647 m_state = nng_socket_state.NNG_STATE_CREATED; 648 m_errno = cast(nng_errno) 0; 649 } 650 651 } // this 652 653 int close() nothrow { 654 int rc; 655 m_errno = cast(nng_errno) 0; 656 foreach (ctx; m_ctx) { 657 rc = nng_ctx_close(ctx); 658 if (rc != 0) { 659 m_errno = cast(nng_errno) rc; 660 return rc; 661 } 662 } 663 rc = nng_close(m_socket); 664 if (rc == 0) { 665 m_state = nng_socket_state.NNG_STATE_NONE; 666 } 667 else { 668 m_errno = cast(nng_errno) rc; 669 } 670 return rc; 671 } 672 673 // setup listener 674 675 int listener_create(const(string) url) { 676 m_errno = cast(nng_errno) 0; 677 if (m_state == nng_socket_state.NNG_STATE_CREATED) { 678 auto rc = nng_listener_create(&m_listener, m_socket, toStringz(url)); 679 if (rc != 0) { 680 m_errno = cast(nng_errno) rc; 681 return rc; 682 } 683 m_state = nng_socket_state.NNG_STATE_PREPARED; 684 return 0; 685 } 686 else { 687 return -1; 688 } 689 } 690 691 int listener_start(const bool nonblock = false) { 692 m_errno = cast(nng_errno) 0; 693 if (m_state == nng_socket_state.NNG_STATE_PREPARED) { 694 auto rc = nng_listener_start(m_listener, nonblock ? nng_flag.NNG_FLAG_NONBLOCK : 0); 695 if (rc != 0) { 696 m_errno = cast(nng_errno) rc; 697 return rc; 698 } 699 m_state = nng_socket_state.NNG_STATE_CONNECTED; 700 return 0; 701 } 702 else { 703 return -1; 704 } 705 } 706 707 int listen(const(string) url, const bool nonblock = false) nothrow { 708 m_errno = cast(nng_errno) 0; 709 if (m_state == nng_socket_state.NNG_STATE_CREATED) { 710 auto rc = nng_listen(m_socket, toStringz(url), &m_listener, nonblock ? nng_flag.NNG_FLAG_NONBLOCK : 0); 711 if (rc != 0) { 712 m_errno = cast(nng_errno) rc; 713 return rc; 714 } 715 m_state = nng_socket_state.NNG_STATE_CONNECTED; 716 return 0; 717 } 718 else { 719 return -1; 720 } 721 } 722 723 // setup subscriber 724 725 int subscribe(string tag) nothrow { 726 if (m_subscriptions.canFind(tag)) 727 return 0; 728 setopt_buf(NNG_OPT_SUB_SUBSCRIBE, cast(ubyte[])(tag.dup)); 729 if (m_errno == 0) 730 m_subscriptions ~= tag; 731 return m_errno; 732 } 733 734 int unsubscribe(string tag) nothrow { 735 long i = m_subscriptions.countUntil(tag); 736 if (i < 0) 737 return 0; 738 setopt_buf(NNG_OPT_SUB_UNSUBSCRIBE, cast(ubyte[])(tag.dup)); 739 if (m_errno == 0) 740 m_subscriptions = m_subscriptions[0 .. i] ~ m_subscriptions[i + 1 .. $]; 741 return m_errno; 742 } 743 744 int clearsubscribe() nothrow { 745 long i; 746 foreach (tag; m_subscriptions) { 747 i = m_subscriptions.countUntil(tag); 748 if (i < 0) 749 continue; 750 setopt_buf(NNG_OPT_SUB_UNSUBSCRIBE, cast(ubyte[])(tag.dup)); 751 if (m_errno != 0) 752 return m_errno; 753 m_subscriptions = m_subscriptions[0 .. i] ~ m_subscriptions[i + 1 .. $]; 754 } 755 return 0; 756 } 757 758 string[] subscriptions() nothrow { 759 return m_subscriptions; 760 } 761 762 // setup dialer 763 764 int dialer_create(const(string) url) nothrow { 765 m_errno = cast(nng_errno) 0; 766 if (m_state == nng_socket_state.NNG_STATE_CREATED) { 767 auto rc = nng_dialer_create(&m_dialer, m_socket, toStringz(url)); 768 if (rc != 0) { 769 m_errno = cast(nng_errno) rc; 770 return rc; 771 } 772 m_state = nng_socket_state.NNG_STATE_PREPARED; 773 return 0; 774 } 775 else { 776 return -1; 777 } 778 } 779 780 int dialer_start(const bool nonblock = false) nothrow { 781 m_errno = cast(nng_errno) 0; 782 if (m_state == nng_socket_state.NNG_STATE_PREPARED) { 783 auto rc = nng_dialer_start(m_dialer, nonblock ? nng_flag.NNG_FLAG_NONBLOCK : 0); 784 if (rc != 0) { 785 m_errno = cast(nng_errno) rc; 786 return rc; 787 } 788 m_state = nng_socket_state.NNG_STATE_CONNECTED; 789 return 0; 790 } 791 else { 792 return -1; 793 } 794 } 795 796 int dial(const(string) url, const bool nonblock = false) nothrow { 797 m_errno = cast(nng_errno) 0; 798 if (m_state == nng_socket_state.NNG_STATE_CREATED) { 799 auto rc = nng_dial(m_socket, toStringz(url), &m_dialer, nonblock ? nng_flag.NNG_FLAG_NONBLOCK : 0); 800 if (rc != 0) { 801 m_errno = cast(nng_errno) rc; 802 return rc; 803 } 804 m_state = nng_socket_state.NNG_STATE_CONNECTED; 805 return 0; 806 } 807 else { 808 return -1; 809 } 810 } 811 812 // send & receive TODO: Serialization for objects and structures - see protobuf or hibon? 813 814 int sendmsg(ref NNGMessage msg, bool nonblock = false) { 815 m_errno = nng_errno.init; 816 if (m_state == nng_socket_state.NNG_STATE_CONNECTED) { 817 m_errno = (() @trusted => cast(nng_errno) nng_sendmsg(m_socket, msg.pointer, nonblock ? nng_flag 818 .NNG_FLAG_NONBLOCK : 0))(); 819 if (m_errno !is nng_errno.init) { 820 return -1; 821 } 822 return 0; 823 } 824 return -1; 825 } 826 827 int send(T)(const(T) data, bool nonblock = false) if (isArray!T) { 828 alias U = ForeachType!T; 829 static assert(U.sizeof == 1, "None byte size array element are not supported"); 830 m_errno = nng_errno.init; 831 if (m_state == nng_socket_state.NNG_STATE_CONNECTED) { 832 auto rc = nng_send(m_socket, ptr(cast(ubyte[]) data), data.length, nonblock ? nng_flag.NNG_FLAG_NONBLOCK : 0); 833 if (rc != 0) { 834 m_errno = cast(nng_errno) rc; 835 return rc; 836 } 837 return 0; 838 } 839 return -1; 840 } 841 842 int sendaio(ref NNGAio aio) { 843 m_errno = nng_errno.init; 844 if (m_state == nng_socket_state.NNG_STATE_CONNECTED) { 845 if (aio.pointer) { 846 nng_send_aio(m_socket, aio.pointer); 847 return 0; 848 } 849 return 1; 850 } 851 return -1; 852 } 853 854 /* 855 Receives a data buffer of the max size data.length 856 Params: 857 data = preallocated buffer 858 nonblock = set the non blocking mode 859 sz = if sz != the this sz is used as max size 860 Returns: 861 number of actually received bytes or -1 862 */ 863 @nogc @safe 864 size_t receivebuf(ubyte[] data, size_t sz = 0, bool nonblock = false) nothrow 865 in (data.length >= sz) 866 in (data.length) 867 do { 868 m_errno = nng_errno.init; 869 if (m_state == nng_socket_state.NNG_STATE_CONNECTED) { 870 sz = (sz == 0) ? data.length : sz; 871 m_errno = (() @trusted => cast(nng_errno) nng_recv(m_socket, ptr(data), &sz, nonblock ? nng_flag 872 .NNG_FLAG_NONBLOCK : 0))(); 873 if (m_errno !is nng_errno.init) { 874 return size_t.max; 875 } 876 return sz; 877 } 878 return size_t.max; 879 } 880 881 /* 882 Receives NNGMessage 883 Params: 884 nonblock = set the non blocking mode 885 */ 886 @nogc @safe 887 int receivemsg(NNGMessage* msg, bool nonblock = false) nothrow { 888 m_errno = nng_errno.init; 889 if (m_state == nng_socket_state.NNG_STATE_CONNECTED) { 890 m_errno = (() @trusted => cast(nng_errno) nng_recvmsg(m_socket, &(msg.msg), nonblock ? nng_flag 891 .NNG_FLAG_NONBLOCK : 0))(); 892 if (m_errno !is nng_errno.init) { 893 return -1; 894 } 895 return 0; 896 } 897 return -1; 898 } 899 900 /* 901 Receives a data type (castable to byte array) as postallocated buffer 902 Params: 903 nonblock = set the non blocking mode 904 */ 905 T receive(T)(bool nonblock = false) if (isArray!T) { 906 m_errno = nng_errno.init; 907 alias U = ForeachType!T; 908 static assert(U.sizeof == 1, "None byte size array element are not supported"); 909 if (m_state == nng_socket_state.NNG_STATE_CONNECTED) { 910 void* buf; 911 size_t sz; 912 auto rc = nng_recv(m_socket, &buf, &sz, nonblock ? nng_flag.NNG_FLAG_NONBLOCK : 0 + nng_flag.NNG_FLAG_ALLOC); 913 if (rc != 0) { 914 m_errno = cast(nng_errno) rc; 915 return T.init; 916 } 917 GC.addRange(buf, sz); 918 return (cast(U*) buf)[0 .. sz]; 919 } 920 return T.init; 921 } 922 923 int receiveaio(ref NNGAio aio) { 924 m_errno = nng_errno.init; 925 if (m_state == nng_socket_state.NNG_STATE_CONNECTED) { 926 if (aio.pointer) { 927 nng_recv_aio(m_socket, aio.pointer); 928 return 0; 929 } 930 return 1; 931 } 932 return -1; 933 } 934 935 // properties Note @propery is not need anymore 936 @nogc nothrow pure { 937 @property int state() const { 938 return m_state; 939 } 940 941 @property int errno() const { 942 return m_errno; 943 } 944 945 @property nng_socket_type type() const { 946 return m_type; 947 } 948 949 @property string versionstring() { 950 import core.stdc.string : strlen; 951 952 return nng_version[0 .. strlen(nng_version)]; 953 } 954 955 string name() const { 956 return m_name; 957 } 958 959 /* You don't need to dup the string because is immutable 960 Only if you are planing to change the content in the string 961 @property void name(string val) { m_name = val.dup; } 962 Ex: 963 The function can be @nogc if you don't duplicate 964 */ 965 void name(string val) { 966 m_name = val; 967 } 968 969 @property bool raw() const { 970 return m_raw; 971 } 972 973 } // nogc nothrow pure 974 975 nothrow { 976 @property int proto() { 977 return getopt_int(NNG_OPT_PROTO); 978 } 979 980 @property string protoname() { 981 return getopt_string(NNG_OPT_PROTONAME); 982 } 983 984 @property int peer() { 985 return getopt_int(NNG_OPT_PEER); 986 } 987 988 @property string peername() { 989 return getopt_string(NNG_OPT_PEERNAME); 990 } 991 992 @property int recvbuf() { 993 return getopt_int(NNG_OPT_RECVBUF); 994 } 995 996 @property void recvbuf(int val) { 997 setopt_int(NNG_OPT_RECVBUF, val); 998 } 999 1000 @property int sendbuf() { 1001 return getopt_int(NNG_OPT_SENDBUF); 1002 } 1003 1004 @property void sendbuf(int val) { 1005 setopt_int(NNG_OPT_SENDBUF, val); 1006 } 1007 1008 @property int recvfd() { 1009 return (m_may_recv) ? getopt_int(NNG_OPT_RECVFD) : -1; 1010 } 1011 1012 @property int sendfd() { 1013 return (m_may_send) ? getopt_int(NNG_OPT_SENDFD) : -1; 1014 } 1015 1016 @property Duration recvtimeout() { 1017 return getopt_duration(NNG_OPT_RECVTIMEO); 1018 } 1019 1020 @property void recvtimeout(Duration val) { 1021 setopt_duration(NNG_OPT_RECVTIMEO, val); 1022 } 1023 1024 @property Duration sendtimeout() { 1025 return getopt_duration(NNG_OPT_SENDTIMEO); 1026 } 1027 1028 @property void sendtimeout(Duration val) { 1029 setopt_duration(NNG_OPT_SENDTIMEO, val); 1030 } 1031 1032 @property nng_sockaddr locaddr() { 1033 return (m_may_send) ? getopt_addr(NNG_OPT_LOCADDR, nng_property_base.NNG_BASE_DIALER) : getopt_addr( 1034 NNG_OPT_LOCADDR, nng_property_base.NNG_BASE_LISTENER); 1035 } 1036 1037 @property nng_sockaddr remaddr() { 1038 return (m_may_send) ? getopt_addr(NNG_OPT_REMADDR, nng_property_base.NNG_BASE_DIALER) : nng_sockaddr( 1039 nng_sockaddr_family.NNG_AF_NONE); 1040 } 1041 } // nothrow 1042 1043 @property string url() { 1044 if (m_may_send) 1045 return getopt_string(NNG_OPT_URL, nng_property_base.NNG_BASE_DIALER); 1046 else if (m_may_recv) 1047 return getopt_string(NNG_OPT_URL, nng_property_base.NNG_BASE_LISTENER); 1048 else 1049 return getopt_string(NNG_OPT_URL, nng_property_base.NNG_BASE_SOCKET); 1050 } 1051 1052 @property int maxttl() { 1053 return getopt_int(NNG_OPT_MAXTTL); 1054 } 1055 /// MAXTTL a value between 0 and 255, inclusive. Where 0 is infinite 1056 @property void maxttl(uint val) 1057 in (val <= 255, "MAXTTL, hops cannot be greater than 255") 1058 do { 1059 setopt_int(NNG_OPT_MAXTTL, val); 1060 } 1061 1062 @property int recvmaxsz() { 1063 return getopt_int(NNG_OPT_RECVMAXSZ); 1064 } 1065 1066 @property void recvmaxsz(int val) { 1067 return setopt_int(NNG_OPT_RECVMAXSZ, val); 1068 } 1069 1070 @property Duration reconnmint() { 1071 return getopt_duration(NNG_OPT_RECONNMINT); 1072 } 1073 1074 @property void reconnmint(Duration val) { 1075 setopt_duration(NNG_OPT_RECONNMINT, val); 1076 } 1077 1078 @property Duration reconnmaxt() { 1079 return getopt_duration(NNG_OPT_RECONNMAXT); 1080 } 1081 1082 @property void reconnmaxt(Duration val) { 1083 setopt_duration(NNG_OPT_RECONNMAXT, val); 1084 } 1085 1086 // TODO: NNG_OPT_IPC_*, NNG_OPT_TLS_*, NNG_OPT_WS_* 1087 private: 1088 nothrow { 1089 void setopt_int(string opt, int val) { 1090 m_errno = cast(nng_errno) 0; 1091 auto rc = nng_socket_set_int(m_socket, toStringz(opt), val); 1092 if (rc == 0) { 1093 return; 1094 } 1095 else { 1096 m_errno = cast(nng_errno) rc; 1097 } 1098 } 1099 1100 int getopt_int(string opt) { 1101 m_errno = cast(nng_errno) 0; 1102 int p; 1103 auto rc = nng_socket_get_int(m_socket, toStringz(opt), &p); 1104 if (rc == 0) { 1105 return p; 1106 } 1107 else { 1108 m_errno = cast(nng_errno) rc; 1109 return -1; 1110 } 1111 } 1112 1113 void setopt_ulong(string opt, ulong val) { 1114 m_errno = cast(nng_errno) 0; 1115 auto rc = nng_socket_set_uint64(m_socket, toStringz(opt), val); 1116 if (rc == 0) { 1117 return; 1118 } 1119 else { 1120 m_errno = cast(nng_errno) rc; 1121 } 1122 } 1123 1124 ulong getopt_ulong(string opt) { 1125 m_errno = cast(nng_errno) 0; 1126 ulong p; 1127 auto rc = nng_socket_get_uint64(m_socket, toStringz(opt), &p); 1128 if (rc == 0) { 1129 return p; 1130 } 1131 else { 1132 m_errno = cast(nng_errno) rc; 1133 return -1; 1134 } 1135 } 1136 1137 void setopt_size(string opt, size_t val) { 1138 m_errno = cast(nng_errno) 0; 1139 auto rc = nng_socket_set_size(m_socket, toStringz(opt), val); 1140 if (rc == 0) { 1141 return; 1142 } 1143 else { 1144 m_errno = cast(nng_errno) rc; 1145 } 1146 } 1147 1148 size_t getopt_size(string opt) { 1149 m_errno = cast(nng_errno) 0; 1150 size_t p; 1151 auto rc = nng_socket_get_size(m_socket, toStringz(opt), &p); 1152 if (rc == 0) { 1153 return p; 1154 } 1155 else { 1156 m_errno = cast(nng_errno) rc; 1157 return -1; 1158 } 1159 } 1160 1161 string getopt_string(string opt, nng_property_base base = nng_property_base.NNG_BASE_SOCKET) { 1162 m_errno = cast(nng_errno) 0; 1163 char* ptr; 1164 int rc; 1165 switch (base) { 1166 case nng_property_base.NNG_BASE_DIALER: 1167 rc = nng_dialer_get_string(m_dialer, cast(const char*) toStringz(opt), &ptr); 1168 break; 1169 case nng_property_base.NNG_BASE_LISTENER: 1170 rc = nng_listener_get_string(m_listener, cast(const char*) toStringz(opt), &ptr); 1171 break; 1172 default: 1173 rc = nng_socket_get_string(m_socket, cast(const char*) toStringz(opt), &ptr); 1174 break; 1175 } 1176 if (rc == 0) { 1177 return to!string(ptr); 1178 } 1179 else { 1180 m_errno = cast(nng_errno) rc; 1181 return "<none>"; 1182 } 1183 } 1184 1185 void setopt_string(string opt, string val) { 1186 m_errno = cast(nng_errno) 0; 1187 auto rc = nng_socket_set_string(m_socket, toStringz(opt), toStringz(val)); 1188 if (rc == 0) { 1189 return; 1190 } 1191 else { 1192 m_errno = cast(nng_errno) rc; 1193 } 1194 } 1195 1196 void setopt_buf(string opt, ubyte[] val) { 1197 m_errno = cast(nng_errno) 0; 1198 auto rc = nng_socket_set(m_socket, toStringz(opt), ptr(val), val.length); 1199 if (rc == 0) { 1200 return; 1201 } 1202 else { 1203 m_errno = cast(nng_errno) rc; 1204 } 1205 } 1206 1207 Duration getopt_duration(string opt) { 1208 m_errno = cast(nng_errno) 0; 1209 nng_duration p; 1210 auto rc = nng_socket_get_ms(m_socket, toStringz(opt), &p); 1211 if (rc == 0) { 1212 return msecs(p); 1213 } 1214 else { 1215 m_errno = cast(nng_errno) rc; 1216 return infiniteDuration; 1217 } 1218 } 1219 1220 void setopt_duration(string opt, Duration val) { 1221 m_errno = cast(nng_errno) 0; 1222 auto rc = nng_socket_set_ms(m_socket, cast(const char*) toStringz(opt), cast(int) val.total!"msecs"); 1223 if (rc == 0) { 1224 return; 1225 } 1226 else { 1227 m_errno = cast(nng_errno) rc; 1228 } 1229 } 1230 1231 nng_sockaddr getopt_addr(string opt, nng_property_base base = nng_property_base.NNG_BASE_SOCKET) { 1232 m_errno = cast(nng_errno) 0; 1233 nng_sockaddr addr; 1234 int rc; 1235 switch (base) { 1236 case nng_property_base.NNG_BASE_DIALER: 1237 rc = nng_dialer_get_addr(m_dialer, toStringz(opt), &addr); 1238 break; 1239 case nng_property_base.NNG_BASE_LISTENER: 1240 rc = nng_listener_get_addr(m_listener, toStringz(opt), &addr); 1241 break; 1242 default: 1243 rc = nng_socket_get_addr(m_socket, toStringz(opt), &addr); 1244 break; 1245 } 1246 if (rc == 0) { 1247 return addr; 1248 } 1249 else { 1250 m_errno = cast(nng_errno) rc; 1251 addr.s_family = nng_sockaddr_family.NNG_AF_NONE; 1252 return addr; 1253 } 1254 } 1255 1256 void setopt_addr(string opt, nng_sockaddr val) { 1257 m_errno = cast(nng_errno) 0; 1258 auto rc = nng_socket_set_addr(m_socket, cast(const char*) toStringz(opt), &val); 1259 if (rc == 0) { 1260 return; 1261 } 1262 else { 1263 m_errno = cast(nng_errno) rc; 1264 } 1265 } 1266 } // nothrow 1267 } // struct Socket 1268 1269 alias nng_pool_callback = void function(NNGMessage*, void*); 1270 1271 enum nng_worker_state { 1272 EXIT = -1, 1273 NONE = 0, 1274 RECV = 1, 1275 WAIT = 2, 1276 SEND = 4 1277 } 1278 1279 struct NNGPoolWorker { 1280 int id; 1281 nng_worker_state state; 1282 NNGMessage msg; 1283 NNGAio aio; 1284 Duration delay; 1285 nng_mtx* mtx; 1286 nng_ctx ctx; 1287 void* context; 1288 File* logfile; 1289 nng_pool_callback cb; 1290 this(int iid, void* icontext, File* ilog) { 1291 this.id = iid; 1292 this.context = icontext; 1293 this.logfile = ilog; 1294 this.state = nng_worker_state.NONE; 1295 this.msg = NNGMessage(0); 1296 this.aio = NNGAio(null, null); 1297 this.delay = msecs(0); 1298 this.cb = null; 1299 auto rc = nng_mtx_alloc(&this.mtx); 1300 enforce(rc == 0, "PW: init"); 1301 } 1302 1303 void lock() { 1304 nng_mtx_lock(mtx); 1305 } 1306 1307 void unlock() { 1308 nng_mtx_unlock(mtx); 1309 } 1310 1311 void wait() { 1312 this.aio.wait(); 1313 } 1314 1315 void shutdown() { 1316 this.state = nng_worker_state.EXIT; 1317 this.aio.stop(); 1318 } 1319 } // struct NNGPoolWorker 1320 1321 extern (C) void nng_pool_stateful(void* p) { 1322 if (p is null) 1323 return; 1324 NNGPoolWorker* w = cast(NNGPoolWorker*) p; 1325 w.lock(); 1326 nng_errno rc; 1327 switch (w.state) { 1328 case nng_worker_state.EXIT: 1329 w.unlock(); 1330 return; 1331 case nng_worker_state.NONE: 1332 w.state = nng_worker_state.RECV; 1333 nng_ctx_recv(w.ctx, w.aio.aio); 1334 break; 1335 case nng_worker_state.RECV: 1336 if (w.aio.result != nng_errno.NNG_OK) { 1337 nng_ctx_recv(w.ctx, w.aio.aio); 1338 break; 1339 } 1340 rc = w.aio.get_msg(w.msg); 1341 if (rc != nng_errno.NNG_OK) { 1342 nng_ctx_recv(w.ctx, w.aio.aio); 1343 break; 1344 } 1345 w.state = nng_worker_state.WAIT; 1346 w.aio.sleep(w.delay); 1347 break; 1348 case nng_worker_state.WAIT: 1349 try { 1350 w.cb(&w.msg, w.context); 1351 } 1352 catch (Exception e) { 1353 if (w.logfile !is null) { 1354 auto f = *(w.logfile); 1355 f.write(format("Error in pool callback: [%d:%s] %s\n", e.line, e.file, e.msg)); 1356 f.flush(); 1357 } 1358 w.msg.clear(); 1359 } 1360 finally { 1361 w.aio.set_msg(w.msg); 1362 w.state = nng_worker_state.SEND; 1363 nng_ctx_send(w.ctx, w.aio.aio); 1364 } 1365 break; 1366 case nng_worker_state.SEND: 1367 rc = w.aio.result; 1368 if (rc != nng_errno.NNG_OK) { 1369 return; 1370 } 1371 w.state = nng_worker_state.RECV; 1372 nng_ctx_recv(w.ctx, w.aio.aio); 1373 break; 1374 default: 1375 w.unlock(); 1376 enforce(false, "Bad pool worker state"); 1377 break; 1378 } 1379 w.unlock(); 1380 } 1381 1382 struct NNGPool { 1383 NNGSocket* sock; 1384 void* context; 1385 File _logfile; 1386 File* logfile; 1387 size_t nworkers; 1388 1389 NNGPoolWorker*[] workers; 1390 1391 @disable this(); 1392 1393 this(NNGSocket* isock, nng_pool_callback cb, size_t n, void* icontext, int logfd = -1) { 1394 enforce(isock.state == nng_socket_state.NNG_STATE_CREATED || isock.state == nng_socket_state.NNG_STATE_CONNECTED); 1395 enforce(isock.type == nng_socket_type.NNG_SOCKET_REP); // TODO: extend to surveyou 1396 enforce(cb != null); 1397 sock = isock; 1398 context = icontext; 1399 if (logfd == -1) { 1400 logfile = null; 1401 } 1402 else { 1403 _logfile = File("/dev/null", "wt"); 1404 _logfile.fdopen(logfd, "wt"); 1405 logfile = &_logfile; 1406 } 1407 nworkers = n; 1408 for (auto i = 0; i < n; i++) { 1409 NNGPoolWorker* w = new NNGPoolWorker(i, context, logfile); 1410 w.aio.realloc(cast(nng_aio_cb)(&nng_pool_stateful), cast(void*) w); 1411 w.cb = cb; 1412 auto rc = nng_ctx_open(&w.ctx, sock.m_socket); 1413 enforce(rc == 0); 1414 workers ~= w; 1415 } 1416 } 1417 1418 void init() { 1419 enforce(nworkers > 0); 1420 for (auto i = 0; i < nworkers; i++) { 1421 nng_pool_stateful(workers[i]); 1422 } 1423 } 1424 1425 void shutdown() { 1426 enforce(nworkers > 0); 1427 for (auto i = 0; i < nworkers; i++) { 1428 workers[i].shutdown(); 1429 } 1430 for (auto i = 0; i < nworkers; i++) { 1431 workers[i].wait(); 1432 } 1433 } 1434 } // struct NNGPool 1435 1436 // ------------------ WebApp classes 1437 1438 alias nng_http_status = libnng.nng_http_status; 1439 alias http_status = nng_http_status; 1440 1441 alias nng_http_req = libnng.nng_http_req; 1442 alias nng_http_res = libnng.nng_http_res; 1443 1444 version (withtls) { 1445 1446 alias nng_tls_mode = libnng.nng_tls_mode; 1447 alias nng_tls_auth_mode = libnng.nng_tls_auth_mode; 1448 alias nng_tls_version = libnng.nng_tls_version; 1449 1450 struct WebTLS { 1451 nng_tls_config* tls; 1452 1453 @disable this(); 1454 1455 this(ref return scope WebTLS rhs) { 1456 } 1457 1458 this(nng_tls_mode imode) { 1459 int rc; 1460 rc = nng_tls_config_alloc(&tls, imode); 1461 enforce(rc == 0, "TLS config init"); 1462 nng_tls_config_hold(tls); 1463 } 1464 1465 ~this() { 1466 nng_tls_config_free(tls); 1467 } 1468 1469 void set_server_name(string iname) { 1470 auto rc = nng_tls_config_server_name(tls, iname.toStringz()); 1471 enforce(rc == 0); 1472 } 1473 1474 void set_ca_chain(string pem, string crl = "") { 1475 auto rc = nng_tls_config_ca_chain(tls, pem.toStringz(), crl.toStringz()); 1476 enforce(rc == 0); 1477 } 1478 1479 void set_own_cert(string pem, string key, string pwd = "") { 1480 auto rc = nng_tls_config_own_cert(tls, pem.toStringz(), key.toStringz(), pwd.toStringz()); 1481 enforce(rc == 0); 1482 } 1483 1484 // TODO: check why this two excluded from the lib 1485 /* 1486 void set_pass ( string ipass ) { 1487 auto rc = nng_tls_config_pass(tls, ipass.toStringz()); 1488 enforce(rc == 0); 1489 } 1490 1491 void set_key ( ubyte[] ipass ) { 1492 auto rc = nng_tls_config_key(tls, ipass.ptr, ipass.length); 1493 enforce(rc == 0); 1494 } 1495 */ 1496 1497 void set_auth_mode(nng_tls_auth_mode imode) { 1498 auto rc = nng_tls_config_auth_mode(tls, imode); 1499 enforce(rc == 0); 1500 } 1501 1502 void set_ca_file(string ica) { 1503 auto rc = nng_tls_config_ca_file(tls, ica.toStringz()); 1504 enforce(rc == 0); 1505 } 1506 1507 void set_cert_key_file(string ipem, string ikey) { 1508 auto rc = nng_tls_config_cert_key_file(tls, ipem.toStringz(), ikey.toStringz()); 1509 writeln("TDEBUG: ", nng_errstr(rc)); 1510 enforce(rc == 0); 1511 } 1512 1513 void set_version(nng_tls_version iminversion, nng_tls_version imaxversion) { 1514 auto rc = nng_tls_config_version(tls, iminversion, imaxversion); 1515 enforce(rc == 0); 1516 } 1517 1518 string engine_name() { 1519 char* buf = nng_tls_engine_name(); 1520 return to!string(buf); 1521 } 1522 1523 string engine_description() { 1524 char* buf = nng_tls_engine_description(); 1525 return to!string(buf); 1526 } 1527 1528 bool fips_mode() { 1529 return nng_tls_engine_fips_mode(); 1530 } 1531 } 1532 1533 } 1534 1535 struct WebAppConfig { 1536 string root_path = ""; 1537 string static_path = ""; 1538 string static_url = ""; 1539 string template_path = ""; 1540 string prefix_url = ""; 1541 this(ref return scope WebAppConfig rhs) { 1542 } 1543 }; 1544 1545 struct WebData { 1546 string route = null; 1547 string rawuri = null; 1548 string uri = null; 1549 string[] path = null; 1550 string[string] param = null; 1551 string[string] headers = null; 1552 string type = "text/html"; 1553 size_t length = 0; 1554 string method = null; 1555 ubyte[] rawdata = null; 1556 string text = null; 1557 JSONValue json = null; 1558 http_status status = http_status.NNG_HTTP_STATUS_NOT_IMPLEMENTED; 1559 string msg = null; 1560 1561 void clear() { 1562 route = ""; 1563 rawuri = ""; 1564 uri = ""; 1565 status = http_status.NNG_HTTP_STATUS_NOT_IMPLEMENTED; 1566 msg = ""; 1567 path = []; 1568 param = null; 1569 headers = null; 1570 type = "text/html"; 1571 length = 0; 1572 method = ""; 1573 rawdata = []; 1574 text = ""; 1575 json = null; 1576 } 1577 1578 string toString() nothrow { 1579 try { 1580 return format(` 1581 <Webdata> 1582 route: %s 1583 rawuri: %s 1584 uri: %s 1585 status: %s 1586 msg: %s 1587 path: %s 1588 param: %s 1589 headers: %s 1590 type: %s 1591 length: %d 1592 method: %s 1593 len(data): %s 1594 text: %s 1595 json: %s 1596 </WebData> 1597 `, 1598 route, rawuri, uri, status, msg, path, param, headers, type, length, method, rawdata.length, to!string( 1599 text), json.toString() 1600 ); 1601 } 1602 catch (Exception e) { 1603 perror("WD: toString error"); 1604 return null; 1605 } 1606 } 1607 1608 void parse_req(nng_http_req* req) { 1609 enforce(req != null); 1610 } 1611 1612 // TODO: find the way to list all headers 1613 1614 void parse_res(nng_http_res* res) { 1615 enforce(res != null); 1616 clear(); 1617 status = cast(http_status) nng_http_res_get_status(res); 1618 msg = to!string(nng_http_res_get_reason(res)); 1619 type = to!string(nng_http_res_get_header(res, toStringz("Content-type"))); 1620 ubyte* buf; 1621 size_t len; 1622 nng_http_res_get_data(res, cast(void**)(&buf), &len); 1623 if (len > 0) { 1624 rawdata ~= buf[0 .. len]; 1625 } 1626 if (type.startsWith("application/json")) { 1627 json = parseJSON(cast(string)(rawdata[0 .. len])); 1628 } 1629 else if (type.startsWith("text")) { 1630 text = cast(string)(rawdata[0 .. len]); 1631 } 1632 length = len; 1633 auto hlength = to!long(to!string(nng_http_res_get_header(res, toStringz("Content-length")))); 1634 enforce(hlength == length); 1635 } 1636 1637 nng_http_req* export_req() { 1638 nng_http_req* req; 1639 nng_url* url; 1640 int rc; 1641 rc = nng_url_parse(&url, ((rawuri.length > 0) ? rawuri : "http://<unknown>" ~ uri).toStringz()); 1642 rc = nng_http_req_alloc(&req, url); 1643 enforce(rc == 0); 1644 rc = nng_http_req_set_method(req, method.toStringz()); 1645 enforce(rc == 0); 1646 rc = nng_http_req_set_header(req, "Content-type", type.toStringz()); 1647 foreach (k; headers.keys) { 1648 rc = nng_http_req_set_header(req, k.toStringz(), headers[k].toStringz()); 1649 } 1650 if (type.startsWith("application/json")) { 1651 string buf = json.toString(); 1652 rc = nng_http_req_copy_data(req, buf.toStringz(), buf.length); 1653 length = buf.length; 1654 enforce(rc == 0, "webdata: copy json rep"); 1655 } 1656 else if (type.startsWith("text")) { 1657 rc = nng_http_req_copy_data(req, text.toStringz(), text.length); 1658 length = text.length; 1659 enforce(rc == 0, "webdata: copy text rep"); 1660 } 1661 else { 1662 rc = nng_http_req_copy_data(req, rawdata.ptr, rawdata.length); 1663 length = rawdata.length; 1664 enforce(rc == 0, "webdata: copy data rep"); 1665 } 1666 rc = nng_http_req_set_header(req, "Content-length", to!string(length).toStringz()); 1667 return req; 1668 } 1669 1670 nng_http_res* export_res() { 1671 char* buf = cast(char*) alloca(512); 1672 nng_http_res* res; 1673 int rc; 1674 rc = nng_http_res_alloc(&res); 1675 enforce(rc == 0); 1676 rc = nng_http_res_set_status(res, cast(ushort) status); 1677 enforce(rc == 0); 1678 if (status != nng_http_status.NNG_HTTP_STATUS_OK) { 1679 nng_http_res_reset(res); 1680 rc = nng_http_res_alloc_error(&res, cast(ushort) status); 1681 enforce(rc == 0); 1682 rc = nng_http_res_set_reason(res, msg.toStringz); 1683 enforce(rc == 0); 1684 return res; 1685 } 1686 { 1687 memcpy(buf, type.ptr, type.length); 1688 buf[type.length] = 0; 1689 rc = nng_http_res_set_header(res, "Content-type", buf); 1690 enforce(rc == 0); 1691 } 1692 if (type.startsWith("application/json")) { 1693 scope string sbuf = json.toString(); 1694 memcpy(buf, sbuf.ptr, sbuf.length); 1695 rc = nng_http_res_copy_data(res, buf, sbuf.length); 1696 length = sbuf.length; 1697 enforce(rc == 0, "webdata: copy json rep"); 1698 } 1699 else if (type.startsWith("text")) { 1700 rc = nng_http_res_copy_data(res, text.ptr, text.length); 1701 length = text.length; 1702 enforce(rc == 0, "webdata: copy text rep"); 1703 } 1704 else { 1705 rc = nng_http_res_copy_data(res, rawdata.ptr, rawdata.length); 1706 length = rawdata.length; 1707 enforce(rc == 0, "webdata: copy data rep"); 1708 } 1709 1710 return res; 1711 } 1712 } 1713 1714 alias webhandler = void function(WebData*, WebData*, void*); 1715 1716 //---------------- 1717 void webrouter (nng_aio* aio) { 1718 1719 int rc; 1720 nng_http_res* res; 1721 nng_http_req* req; 1722 nng_http_handler* h; 1723 1724 void* reqbody; 1725 size_t reqbodylen; 1726 1727 WebData sreq = WebData.init; 1728 WebData srep = WebData.init; 1729 WebApp* app; 1730 1731 char* sbuf = cast(char*) nng_alloc(4096); 1732 1733 string errstr = ""; 1734 1735 const char* t1 = "NODATA"; 1736 1737 // TODO: invite something for proper default response for no handlers, maybe 100 or 204 ? To discuss. 1738 1739 srep.type = "text/plain"; 1740 srep.text = "No result"; 1741 srep.status = nng_http_status.NNG_HTTP_STATUS_OK; 1742 1743 req = cast(nng_http_req*) nng_aio_get_input(aio, 0); 1744 if (req is null) { 1745 errstr = "WR: get request"; 1746 goto failure; 1747 } 1748 1749 h = cast(nng_http_handler*) nng_aio_get_input(aio, 1); 1750 if (req is null) { 1751 errstr = "WR: get handler"; 1752 goto failure; 1753 } 1754 1755 app = cast(WebApp*) nng_http_handler_get_data(h); 1756 if (app is null) { 1757 errstr = "WR: get handler data"; 1758 goto failure; 1759 } 1760 1761 nng_http_req_get_data(req, &reqbody, &reqbodylen); 1762 1763 sreq.method = cast(immutable)(fromStringz(nng_http_req_get_method(req))); 1764 1765 sprintf(sbuf, "Content-type"); 1766 sreq.type = cast(immutable)(fromStringz(nng_http_req_get_header(req, sbuf))); 1767 if (sreq.type.empty) 1768 sreq.type = "text/plain"; 1769 1770 sreq.uri = cast(immutable)(fromStringz(nng_http_req_get_uri(req))); 1771 1772 sreq.rawdata = cast(ubyte[])(reqbody[0 .. reqbodylen]); 1773 1774 app.webprocess(&sreq, &srep); 1775 1776 res = srep.export_res; 1777 1778 nng_free(sbuf, 4096); 1779 nng_aio_set_output(aio, 0, res); 1780 nng_aio_finish(aio, 0); 1781 1782 return; 1783 1784 failure: 1785 writeln("ERROR: " ~ errstr); 1786 nng_free(sbuf, 4096); 1787 nng_http_res_free(res); 1788 nng_aio_finish(aio, rc); 1789 } // router handler 1790 1791 struct WebApp { 1792 string name; 1793 WebAppConfig config; 1794 nng_http_server* server; 1795 nng_aio* aio; 1796 nng_url* url; 1797 webhandler[string] routes; 1798 void* context; 1799 1800 @disable this(); 1801 1802 this(string iname, string iurl, WebAppConfig iconfig, void* icontext = null) { 1803 name = iname; 1804 context = icontext; 1805 auto rc = nng_url_parse(&url, iurl.toStringz()); 1806 enforce(rc == 0, "server url parse"); 1807 config = iconfig; 1808 init(); 1809 } 1810 1811 this(string iname, string iurl, JSONValue iconfig, void* icontext = null) { 1812 name = iname; 1813 context = icontext; 1814 auto rc = nng_url_parse(&url, iurl.toStringz()); 1815 enforce(rc == 0, "server url parse"); 1816 if ("root_path" in iconfig) 1817 config.root_path = iconfig["root_path"].str; 1818 if ("static_path" in iconfig) 1819 config.static_path = iconfig["static_path"].str; 1820 if ("static_url" in iconfig) 1821 config.static_url = iconfig["static_url"].str; 1822 if ("template_path" in iconfig) 1823 config.template_path = iconfig["template_path"].str; 1824 if ("prefix_url" in iconfig) 1825 config.prefix_url = iconfig["prefix_url"].str; 1826 init(); 1827 } 1828 1829 version (withtls) { 1830 void set_tls(WebTLS tls) { 1831 auto rc = nng_http_server_set_tls(server, tls.tls); 1832 enforce(rc == 0, "server set tls"); 1833 } 1834 } 1835 1836 void route(string path, webhandler handler, string[] methods = ["GET"]) { 1837 int rc; 1838 bool wildcard = false; 1839 if (path.endsWith("/*")) { 1840 path = path[0 .. $ - 2]; 1841 wildcard = true; 1842 } 1843 foreach (m; methods) { 1844 foreach (r; sort(routes.keys)) { 1845 enforce(m ~ ":" ~ path != r, "router path already registered: " ~ m ~ ":" ~ path); 1846 } 1847 routes[m ~ ":" ~ path] = handler; 1848 nng_http_handler* hr; 1849 rc = nng_http_handler_alloc(&hr, toStringz(config.prefix_url ~ path), &webrouter); 1850 enforce(rc == 0, "route handler alloc"); 1851 rc = nng_http_handler_set_method(hr, m.toStringz()); 1852 enforce(rc == 0, "route handler set method"); 1853 rc = nng_http_handler_set_data(hr, &this, null); 1854 enforce(rc == 0, "route handler set context"); 1855 if (wildcard) { 1856 rc = nng_http_handler_set_tree(hr); 1857 enforce(rc == 0, "route handler tree"); 1858 } 1859 rc = nng_http_server_add_handler(server, hr); 1860 enforce(rc == 0, "route handler add"); 1861 } 1862 } 1863 1864 void start() { 1865 auto rc = nng_http_server_start(server); 1866 enforce(rc == 0, "server start"); 1867 } 1868 1869 void stop() { 1870 nng_http_server_stop(server); 1871 } 1872 1873 void webprocess(WebData* req, WebData* rep) { 1874 int rc; 1875 1876 rep.status = nng_http_status.NNG_HTTP_STATUS_OK; 1877 rep.type = "text/plain"; 1878 rep.text = "Test result"; 1879 1880 nng_url* u; 1881 string ss = ("http://localhost" ~ req.uri ~ "\0"); 1882 char[] buf = ss.dup; 1883 rc = nng_url_parse(&u, buf.ptr); 1884 enforce(rc == 0); 1885 req.route = cast(immutable)(fromStringz(u.u_path)).dup; 1886 req.path = req.route.split("/"); 1887 if (req.path.length > 1 && req.path[0] == "") 1888 req.path = req.path[1 .. $]; 1889 string query = cast(immutable)(fromStringz(u.u_query)).dup; 1890 foreach (p; query.split("&")) { 1891 auto a = p.split("="); 1892 if (a[0] != "") 1893 req.param[a[0]] = a[1]; 1894 } 1895 nng_url_free(u); 1896 1897 if (req.type.startsWith("application/json")) { 1898 try { 1899 req.json = parseJSON(cast(immutable)(fromStringz(cast(char*) req.rawdata))); 1900 } 1901 catch (JSONException e) { 1902 rep.status = nng_http_status.NNG_HTTP_STATUS_BAD_REQUEST; 1903 rep.msg = "Invalid json"; 1904 return; 1905 } 1906 } 1907 1908 if (req.type.startsWith("text/")) { 1909 req.text = cast(immutable)(fromStringz(cast(char*) req.rawdata)); 1910 } 1911 1912 // TODO: implement full CEF parser for routes 1913 webhandler handler = null; 1914 string rkey = req.method ~ ":" ~ req.route; 1915 foreach (r; sort!("a > b")(routes.keys)) { 1916 if (rkey.startsWith(r)) { 1917 handler = routes[r]; 1918 break; 1919 } 1920 } 1921 if (handler == null) 1922 handler = &default_handler; 1923 1924 handler(req, rep, context); 1925 1926 } 1927 1928 private: 1929 1930 void init() { 1931 int rc; 1932 if (config.root_path == "") 1933 config.root_path = __FILE__.absolutePath.dirName; 1934 if (config.static_path == "") 1935 config.static_path = "/"; 1936 if (config.static_url == "") 1937 config.static_url = config.static_path; 1938 rc = nng_http_server_hold(&server, url); 1939 enforce(rc == 0, "server hold"); 1940 1941 nng_http_handler* hs; 1942 rc = nng_http_handler_alloc_directory(&hs, toStringz(config.prefix_url ~ "/" ~ config.static_path), buildPath( 1943 config.root_path, config.static_url).toStringz()); 1944 enforce(rc == 0, "static handler alloc"); 1945 rc = nng_http_server_add_handler(server, hs); 1946 enforce(rc == 0, "static handler add"); 1947 1948 rc = nng_aio_alloc(&aio, null, null); 1949 enforce(rc == 0, "aio alloc"); 1950 1951 } 1952 1953 static void default_handler(WebData* req, WebData* rep, void* ctx) { 1954 rep.type = "text/plain"; 1955 rep.text = "Default reponse"; 1956 rep.status = nng_http_status.NNG_HTTP_STATUS_OK; 1957 } 1958 } // struct WebApp 1959 1960 // for user defined result handlers 1961 alias webclienthandler = void function(WebData*, void*); 1962 1963 // for async client router 1964 extern (C) struct WebClientAsync { 1965 char* uri; 1966 nng_http_req* req; 1967 nng_http_res* res; 1968 nng_aio* aio; 1969 void* context; 1970 webclienthandler commonhandler; 1971 webclienthandler errorhandler; 1972 } 1973 1974 // common async client router 1975 static void webclientrouter(void* p) { 1976 if (p == null) 1977 return; 1978 WebClientAsync* a = cast(WebClientAsync*)(p); 1979 WebData rep = WebData(); 1980 rep.parse_res(a.res); 1981 rep.rawuri = to!string(a.uri); 1982 if (rep.status != nng_http_status.NNG_HTTP_STATUS_OK && a.errorhandler != null) 1983 a.errorhandler(&rep, a.context); 1984 else 1985 a.commonhandler(&rep, a.context); 1986 nng_http_req_free(a.req); 1987 nng_http_res_free(a.res); 1988 nng_aio_free(a.aio); 1989 } 1990 1991 struct WebClient { 1992 1993 nng_http_client* cli; 1994 nng_http_conn* conn; 1995 nng_http_req* req; 1996 nng_http_res* res; 1997 nng_url* url; 1998 bool connected; 1999 2000 // constructor and connector are for future use woth streaming functions 2001 // for single transactions use static members (sync or async ) 2002 2003 this(string uri) { 2004 int rc; 2005 connected = false; 2006 rc = nng_http_res_alloc(&res); 2007 enforce(rc == 0); 2008 rc = nng_http_req_alloc(&req, null); 2009 enforce(rc == 0); 2010 if (uri != null && uri != "") { 2011 rc = connect(uri); 2012 enforce(rc == 0); 2013 } 2014 2015 } 2016 2017 int connect(string uri) { 2018 int rc; 2019 nng_aio* aio; 2020 if (cli != null) 2021 nng_http_client_free(cli); 2022 rc = nng_aio_alloc(&aio, null, null); 2023 enforce(rc == 0); 2024 rc = nng_url_parse(&url, uri.toStringz()); 2025 enforce(rc == 0); 2026 rc = nng_http_client_alloc(&cli, url); 2027 enforce(rc == 0); 2028 nng_http_client_connect(cli, aio); 2029 nng_aio_wait(aio); 2030 rc = nng_aio_result(aio); 2031 enforce(rc == 0); 2032 conn = cast(nng_http_conn*) nng_aio_get_output(aio, 0); 2033 enforce(conn != null); 2034 connected = true; 2035 return 0; 2036 } 2037 2038 ~this() { 2039 nng_http_client_free(cli); 2040 nng_url_free(url); 2041 nng_http_req_free(req); 2042 nng_http_res_free(res); 2043 } 2044 2045 // static sync get 2046 static WebData get(string uri, string[string] headers, Duration timeout = 30000.msecs) { 2047 int rc; 2048 nng_http_client* cli; 2049 nng_url* url; 2050 nng_http_req* req; 2051 nng_http_res* res; 2052 nng_aio* aio; 2053 WebData wd = WebData(); 2054 rc = nng_url_parse(&url, uri.toStringz()); 2055 enforce(rc == 0); 2056 rc = nng_http_client_alloc(&cli, url); 2057 enforce(rc == 0); 2058 rc = nng_http_req_alloc(&req, url); 2059 enforce(rc == 0); 2060 rc = nng_http_res_alloc(&res); 2061 enforce(rc == 0); 2062 rc = nng_aio_alloc(&aio, null, null); 2063 enforce(rc == 0); 2064 nng_aio_set_timeout(aio, cast(nng_duration) timeout.total!"msecs"); 2065 2066 scope (exit) { 2067 nng_http_client_free(cli); 2068 nng_url_free(url); 2069 nng_aio_free(aio); 2070 nng_http_req_free(req); 2071 nng_http_res_free(res); 2072 } 2073 2074 rc = nng_http_req_set_method(req, toStringz("GET")); 2075 enforce(rc == 0); 2076 foreach (k; headers.keys) { 2077 rc = nng_http_req_set_header(req, k.toStringz(), headers[k].toStringz()); 2078 enforce(rc == 0); 2079 } 2080 nng_http_client_transact(cli, req, res, aio); 2081 nng_aio_wait(aio); 2082 rc = nng_aio_result(aio); 2083 if (rc == 0) { 2084 wd.parse_res(res); 2085 } 2086 else { 2087 wd.status = nng_http_status.NNG_HTTP_STATUS_REQUEST_TIMEOUT; 2088 wd.msg = nng_errstr(rc); 2089 } 2090 return wd; 2091 } 2092 2093 // static sync post 2094 static WebData post(string uri, ubyte[] data, string[string] headers, Duration timeout = 30000.msecs) { 2095 int rc; 2096 nng_http_client* cli; 2097 nng_url* url; 2098 nng_http_req* req; 2099 nng_http_res* res; 2100 nng_aio* aio; 2101 WebData wd = WebData(); 2102 rc = nng_url_parse(&url, uri.toStringz()); 2103 enforce(rc == 0); 2104 rc = nng_http_client_alloc(&cli, url); 2105 enforce(rc == 0); 2106 rc = nng_http_req_alloc(&req, url); 2107 enforce(rc == 0); 2108 rc = nng_http_res_alloc(&res); 2109 enforce(rc == 0); 2110 rc = nng_aio_alloc(&aio, null, null); 2111 enforce(rc == 0); 2112 nng_aio_set_timeout(aio, cast(nng_duration) timeout.total!"msecs"); 2113 scope (exit) { 2114 nng_http_client_free(cli); 2115 nng_url_free(url); 2116 nng_aio_free(aio); 2117 nng_http_req_free(req); 2118 nng_http_res_free(res); 2119 } 2120 rc = nng_http_req_set_method(req, toStringz("POST")); 2121 enforce(rc == 0); 2122 foreach (k; headers.keys) { 2123 rc = nng_http_req_set_header(req, k.toStringz(), headers[k].toStringz()); 2124 enforce(rc == 0); 2125 } 2126 rc = nng_http_req_copy_data(req, data.ptr, data.length); 2127 enforce(rc == 0); 2128 nng_http_client_transact(cli, req, res, aio); 2129 nng_aio_wait(aio); 2130 rc = nng_aio_result(aio); 2131 if (rc == 0) { 2132 wd.parse_res(res); 2133 } 2134 else { 2135 wd.status = nng_http_status.NNG_HTTP_STATUS_REQUEST_TIMEOUT; 2136 wd.msg = nng_errstr(rc); 2137 } 2138 return wd; 2139 2140 } 2141 2142 // static async get 2143 static NNGAio get_async(string uri, string[string] headers, webclienthandler handler, Duration timeout = 30000 2144 .msecs, void* context = null) { 2145 int rc; 2146 nng_aio* aio; 2147 nng_http_client* cli; 2148 nng_http_req* req; 2149 nng_http_res* res; 2150 nng_url* url; 2151 rc = nng_url_parse(&url, uri.toStringz()); 2152 enforce(rc == 0); 2153 rc = nng_http_client_alloc(&cli, url); 2154 enforce(rc == 0); 2155 rc = nng_http_req_alloc(&req, url); 2156 enforce(rc == 0); 2157 rc = nng_http_res_alloc(&res); 2158 enforce(rc == 0); 2159 rc = nng_aio_alloc(&aio, null, null); 2160 enforce(rc == 0); 2161 nng_aio_set_timeout(aio, cast(nng_duration) timeout.total!"msecs"); 2162 WebClientAsync* a = new WebClientAsync(); 2163 a.uri = cast(char*) uri.dup.toStringz(); 2164 a.commonhandler = handler; 2165 a.context = context; 2166 a.req = req; 2167 a.res = res; 2168 a.aio = aio; 2169 rc = nng_aio_alloc(&aio, &webclientrouter, a); 2170 enforce(rc == 0); 2171 rc = nng_http_req_set_method(req, toStringz("GET")); 2172 enforce(rc == 0); 2173 foreach (k; headers.keys) { 2174 rc = nng_http_req_set_header(req, k.toStringz(), headers[k].toStringz()); 2175 enforce(rc == 0); 2176 } 2177 nng_http_client_transact(cli, req, res, aio); 2178 return NNGAio(aio); 2179 } 2180 2181 // static async post 2182 static NNGAio post_async(string uri, ubyte[] data, string[string] headers, webclienthandler handler, Duration timeout = 30000 2183 .msecs, void* context = null) { 2184 int rc; 2185 nng_aio* aio; 2186 nng_http_client* cli; 2187 nng_http_req* req; 2188 nng_http_res* res; 2189 nng_url* url; 2190 rc = nng_url_parse(&url, uri.toStringz()); 2191 enforce(rc == 0); 2192 rc = nng_http_client_alloc(&cli, url); 2193 enforce(rc == 0); 2194 rc = nng_http_req_alloc(&req, url); 2195 enforce(rc == 0); 2196 rc = nng_http_res_alloc(&res); 2197 enforce(rc == 0); 2198 rc = nng_aio_alloc(&aio, null, null); 2199 enforce(rc == 0); 2200 nng_aio_set_timeout(aio, cast(nng_duration) timeout.total!"msecs"); 2201 WebClientAsync* a = new WebClientAsync(); 2202 a.uri = cast(char*) uri.dup.toStringz(); 2203 a.commonhandler = handler; 2204 a.context = context; 2205 a.req = req; 2206 a.res = res; 2207 a.aio = aio; 2208 rc = nng_aio_alloc(&aio, &webclientrouter, a); 2209 enforce(rc == 0); 2210 rc = nng_http_req_set_method(req, toStringz("POST")); 2211 enforce(rc == 0); 2212 foreach (k; headers.keys) { 2213 rc = nng_http_req_set_header(req, k.toStringz(), headers[k].toStringz()); 2214 enforce(rc == 0); 2215 } 2216 rc = nng_http_req_copy_data(req, data.ptr, data.length); 2217 enforce(rc == 0); 2218 nng_http_client_transact(cli, req, res, aio); 2219 return NNGAio(aio); 2220 } 2221 2222 // common static method for any request methods and error handler ( inspired by ajax ) 2223 // if text is not null data is ignored 2224 // for methods except POST,PUT,PATCH both text and data are ignored 2225 static NNGAio request( 2226 string method, 2227 string uri, 2228 string[string] headers, 2229 string text, 2230 ubyte[] data, 2231 webclienthandler onsuccess, 2232 webclienthandler onerror, 2233 Duration timeout = 30000.msecs, 2234 void* context = null) { 2235 int rc; 2236 nng_aio* aio; 2237 nng_http_client* cli; 2238 nng_http_req* req; 2239 nng_http_res* res; 2240 nng_url* url; 2241 rc = nng_url_parse(&url, uri.toStringz()); 2242 enforce(rc == 0); 2243 rc = nng_http_client_alloc(&cli, url); 2244 enforce(rc == 0); 2245 rc = nng_http_req_alloc(&req, url); 2246 enforce(rc == 0); 2247 rc = nng_http_res_alloc(&res); 2248 enforce(rc == 0); 2249 rc = nng_aio_alloc(&aio, null, null); 2250 enforce(rc == 0); 2251 nng_aio_set_timeout(aio, cast(nng_duration) timeout.total!"msecs"); 2252 WebClientAsync* a = new WebClientAsync(); 2253 a.uri = cast(char*) uri.dup.toStringz(); 2254 a.commonhandler = onsuccess; 2255 a.errorhandler = onerror; 2256 a.context = context; 2257 a.req = req; 2258 a.res = res; 2259 a.aio = aio; 2260 rc = nng_aio_alloc(&aio, &webclientrouter, a); 2261 enforce(rc == 0); 2262 rc = nng_http_req_set_method(req, toStringz(method)); 2263 enforce(rc == 0); 2264 foreach (k; headers.keys) { 2265 rc = nng_http_req_set_header(req, k.toStringz(), headers[k].toStringz()); 2266 enforce(rc == 0); 2267 } 2268 if (method == "POST" || method == "PUT" || method == "PATCH") { 2269 if (text == null) { 2270 rc = nng_http_req_copy_data(req, data.ptr, data.length); 2271 } 2272 else { 2273 rc = nng_http_req_copy_data(req, text.toStringz(), text.length); 2274 } 2275 enforce(rc == 0); 2276 } 2277 nng_http_client_transact(cli, req, res, aio); 2278 return NNGAio(aio); 2279 } 2280 2281 }