Thread overview
KQueue and Fibers
Apr 09, 2021
rashir
Apr 09, 2021
Arjan
Apr 09, 2021
rashir
Apr 09, 2021
Jacob Carlborg
Apr 10, 2021
rashir
April 09, 2021

Goodmorning everyone,
I'm trying to understand both Kqueue and Fiber's operation on Mac. Why don't I get the correct data as long as I read from the socket?
It seems to be reading too early, but Kquue tells me that the socket is readable.

unittest
{
    import core.thread.fiber;
    import core.sys.darwin.sys.event;
    import core.sys.posix.unistd : close;
    import core.sys.posix.sys.socket;
    import core.sys.posix.netinet.in_;
    import core.sys.posix.fcntl;
    import core.stdc.errno;
    import std.stdio;

    int kq = kqueue();
    assert(kq != -1 && kq > 0);
    scope (exit)
        close(kq);

    int fd;
    long writeableAmount;
    long readableAmount;

    void fiberFunc()
    {
        writeln("fiber started ...");
        fd = socket(AF_INET, SOCK_STREAM, 0); // zero is the 'protocol'
        assert(fd > 0);
        scope (exit)
            close(fd);
        int fcntl_flags = fcntl(fd, F_GETFL);
        assert(fcntl_flags >= 0);
        fcntl_flags = fcntl(fd, F_SETFL, fcntl_flags | O_NONBLOCK);
        assert(fcntl_flags >= 0);

        sockaddr_in serv_addr;
        serv_addr.sin_family = AF_INET;
        serv_addr.sin_port = htons(80);
        serv_addr.sin_addr.s_addr = 0x44B8FA8E; // 142.250.184.68  www.google.com (right now)

        int r = connect(fd, cast(sockaddr*)&serv_addr, serv_addr.sizeof);
        assert(r == -1 && errno == 36); // 36: EINPROGRESS
        writeln("yield for connect");
        Fiber.yield(); // connecting

        writeln("yield for writability");
        Fiber.yield(); // waiting writability
        string buf = "GET index.html\r\n";
        const bytesWritten = core.sys.posix.sys.socket.send(fd, buf.ptr,
                writeableAmount < buf.length ? writeableAmount : buf.length, 0);
        assert(bytesWritten != EAGAIN && bytesWritten != EWOULDBLOCK);
        assert(bytesWritten == 16);
        writeln(bytesWritten, " bytes written, errno:", errno);

        writeln("yield for readibility");
        Fiber.yield(); // wait for readibility

        char[] b;
        b.length = 2048;
        const bytesRead = recv(fd, b.ptr, readableAmount < b.length ? readableAmount : b.length, 0);
        writeln("read bytesRead: ", bytesRead, "readableAmount:",
                readableAmount, " errno:", errno);
        assert(bytesRead != EAGAIN && bytesRead != EWOULDBLOCK);

        writeln("string received:", b);
    }

    auto fiber = new Fiber(&fiberFunc);
    fiber.call();
    {
        writeln("kqueue waiting for writeability (socket connect)");
        kevent64_s[1] changeList;
        EV_SET64(changeList.ptr, fd, EVFILT_WRITE, EV_ADD, 0, 0, 0, [0, 0]);
        int n = kevent64(kq, changeList.ptr, 1, null, 0, 0, null);
        assert(n == 0);

        kevent64_s[1] eventList;
        n = kevent64(kq, null, 0, eventList.ptr, eventList.length, 0, null);
        assert(n == 1);

        assert(eventList[0].ident == fd);
        assert(eventList[0].filter & EVFILT_WRITE);
        assert((eventList[0].flags & EV_EOF) == 0);
    }
    writeln("resuming as the descriptor is writeable, so connected (socket write)");
    fiber.call();
    {
        writeln("kqueue waiting for writeability");
        kevent64_s[1] changeList;
        EV_SET64(changeList.ptr, fd, EVFILT_WRITE, EV_ADD | EV_ONESHOT | EV_ENABLE, 0, 0, 0, [
                0, 0
                ]);
        int n = kevent64(kq, changeList.ptr, 1, null, 0, 0, null);

        kevent64_s[1] eventList;
        n = kevent64(kq, null, 0, eventList.ptr, eventList.length, 0, null);
        assert(n == 1);

        assert(eventList[0].ident == fd);
        assert(eventList[0].filter & EVFILT_WRITE);

        writeableAmount = eventList[0].data;
    }
    writeln("resuming fiber as it's writeable");
    fiber.call();
    {
        writeln("kqueue waiting for readibilty");
        kevent64_s[1] changeList;
        EV_SET64(changeList.ptr, fd, EVFILT_READ, EV_ADD | EV_ONESHOT | EV_ENABLE, 0, 0, 0, [
                0, 0
                ]);
        int n = kevent64(kq, changeList.ptr, 1, null, 0, 0, null);
        assert(n == 0);

        kevent64_s[1] eventList;
        n = kevent64(kq, null, 0, eventList.ptr, eventList.length, 0, null);
        assert(n == 1);

        assert(eventList[0].ident == fd);
        assert(eventList[0].filter & EVFILT_READ);

        readableAmount = eventList[0].data;
    }
    writeln("resuming fiber as it's readable ", readableAmount);
    fiber.call();
}


fiber started ...
yield for connect
kqueue waiting for writeability (socket connect)
resuming as the descriptor is writeable, so connected (socket write)
yield for writability
kqueue waiting for writeability
resuming fiber as it's writeable
16 bytes written, errno:36
yield for readibility
kqueue waiting for readibilty
resuming fiber as it's readable 131858
read bytesRead: -1readableAmount:131858 errno:35
string received:��������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������
1 modules passed unittests
April 09, 2021

On Friday, 9 April 2021 at 09:00:17 UTC, rashir wrote:

>

Goodmorning everyone,
I'm trying to understand both Kqueue and Fiber's operation on Mac. Why don't I get the correct data as long as I read from the socket?
It seems to be reading too early, but Kquue tells me that the socket is readable.

...

>

yield for readibility
kqueue waiting for readibilty
resuming fiber as it's readable 131858
read bytesRead: -1readableAmount:131858 errno:35

35 == EAGAIN This informs the operation (recv) could not complete without blocking and should be retried. This does not mean the socket is not readable, but the operation would block (for whatever reason).

April 09, 2021

On Friday, 9 April 2021 at 09:49:24 UTC, Arjan wrote:

>

On Friday, 9 April 2021 at 09:00:17 UTC, rashir wrote:

>

Goodmorning everyone,
I'm trying to understand both Kqueue and Fiber's operation on Mac. Why don't I get the correct data as long as I read from the socket?
It seems to be reading too early, but Kquue tells me that the socket is readable.

...

>

yield for readibility
kqueue waiting for readibilty
resuming fiber as it's readable 131858
read bytesRead: -1readableAmount:131858 errno:35

35 == EAGAIN This informs the operation (recv) could not complete without blocking and should be retried. This does not mean the socket is not readable, but the operation would block (for whatever reason).

Thank you, but I don't think I understand, it's not exactly KQueue's purpose to know when I can read in the socket without blocking myself?
Why just before the recv reported that there are 131858 bytes, while in reality it is not yet possible to read from the socket?
Thank you

April 09, 2021
On 2021-04-09 11:00, rashir wrote:
> Goodmorning everyone,
> I'm trying to understand both Kqueue and Fiber's operation on Mac. Why don't I get the correct data as long as I read from the socket?
> It seems to be reading too early, but Kquue tells me that the socket is readable.
> 
> ```D
>         const bytesRead = recv(fd, b.ptr, readableAmount < b.length ? readableAmount : b.length, 0);
>         writeln("read bytesRead: ", bytesRead, "readableAmount:",
>                 readableAmount, " errno:", errno);
>         assert(bytesRead != EAGAIN && bytesRead != EWOULDBLOCK);
> ```

`recv` returns the number of bytes received or `-1` if an error occurred. `EAGAIN` and `EWOULDBLOCK` are error codes. You should not compare the value returned by `recv` with error codes. The error code will be placed in `errno`.

> ```D
>         assert(eventList[0].filter & EVFILT_READ);
> ```

The `filter` field of an event is not a flag/bit field. It's just a plain value, you should use `==` to check if it's a read event.

I'm not sure if fixing these things will solve your issue. But at least some problems I noticed.

-- 
/Jacob Carlborg
April 10, 2021
On Friday, 9 April 2021 at 18:37:43 UTC, Jacob Carlborg wrote:
> On 2021-04-09 11:00, rashir wrote:
>> Goodmorning everyone,
>> I'm trying to understand both Kqueue and Fiber's operation on Mac. Why don't I get the correct data as long as I read from the socket?
>> It seems to be reading too early, but Kquue tells me that the socket is readable.
>> 
>> ```D
>>         const bytesRead = recv(fd, b.ptr, readableAmount < b.length ? readableAmount : b.length, 0);
>>         writeln("read bytesRead: ", bytesRead, "readableAmount:",
>>                 readableAmount, " errno:", errno);
>>         assert(bytesRead != EAGAIN && bytesRead != EWOULDBLOCK);
>> ```
>
> `recv` returns the number of bytes received or `-1` if an error occurred. `EAGAIN` and `EWOULDBLOCK` are error codes. You should not compare the value returned by `recv` with error codes. The error code will be placed in `errno`.
>
>> ```D
>>         assert(eventList[0].filter & EVFILT_READ);
>> ```
>
> The `filter` field of an event is not a flag/bit field. It's just a plain value, you should use `==` to check if it's a read event.
>
> I'm not sure if fixing these things will solve your issue. But at least some problems I noticed.

Thank you

Correcting the asserts I managed to understand the problem.

A EV_ONESHOT was missing in the first EVFILT_WRITE, so I was receiving a notification of Write, interpreting it as a notification of Read.

Cheers