Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: add a new subclass of StructuredTaskScope that shows the finished subtasks as a Stream #201

Closed
wants to merge 0 commits into from

Conversation

forax
Copy link
Member

@forax forax commented Sep 1, 2023

This is a rough minimal patch that adds a new subclass of StructuredTaskScope named Stremable (better name needed) pushing failed or succceding subtasks into a Stream.

This subclass aim to:

  • make easier for users to use STS without having to override handleCompleted, which is called concurrently so hard to get right, at a price of being a little less efficient
  • ease the implementation of shortcut semantics like get the first two values, get the first value greater than a threshold, etc by auto shutdowning the STS once the condition is true

The Streamable STS adds a new method joinWhen(function) that takes a function that takes a Stream and return a value

  public <U> U joinWhen(Function<? super Stream<Subtask<T>>, ? extends U> mapper) throws InterruptedException {

When this method is called, the non NONAVAILABLE subtask are pushed into the Stream once they have finished until either there is no more subtasks or the stream has finished (has been short-circuited). If some tasks are still pending because the stream has been short-cirtuited, they are shutdown.

Here are two examples:

  • get a list of all the values that suceeed
    try(var streamable = new StructuredTaskScope.Streamable<Integer>()) {
            streamable.fork(() -> {
                Thread.sleep(200);
                return 12;
            });
            streamable.fork(() -> {
                Thread.sleep(100);
                return 17;
            });
            List<Integer> list = streamable.joinWhen(stream -> stream.filter(task -> task.state() == State.SUCCESS).map(Subtask::get).toList());
            System.out.println(list);  // [17, 12]
        }
  • find the first subtask (that suceed or fail)
    try(var streamable = new StructuredTaskScope.Streamable<Integer>()) {
            streamable.fork(() -> {
                Thread.sleep(1_000);
                return 12;
            });
            streamable.fork(() -> {
                Thread.sleep(100);
                return 17;
            });
            Optional<Subtask<Integer>> first = streamable.joinWhen(Stream::findFirst);
            System.out.println(first);  // Optional[PlainSubTask[state=SUCCESS, result=17, exception=null]]
        }

Internally, handleCompleted post each subtask into a queue which is read by the Stream spliterator inside joinWhen.

The current implementation is not the right one, instead of introducing a new method into the Flock that can wait on shutdown, threadCount == 0 and the queue has a new subtask, this implementation shutdown the flock early and do not implement shutdown() so it only have to check if the number of tasks is zero.


Progress

  • Change must not contain extraneous whitespace

Reviewing

Using git

Checkout this PR locally:
$ git fetch https://git.openjdk.org/loom.git pull/201/head:pull/201
$ git checkout pull/201

Update a local copy of the PR:
$ git checkout pull/201
$ git pull https://git.openjdk.org/loom.git pull/201/head

Using Skara CLI tools

Checkout this PR locally:
$ git pr checkout 201

View PR using the GUI difftool:
$ git pr show -t 201

Using diff file

Download this PR as a diff file:
https://git.openjdk.org/loom/pull/201.diff

Sorry, something went wrong.

@bridgekeeper
Copy link

bridgekeeper bot commented Sep 1, 2023

👋 Welcome back forax! A progress list of the required criteria for merging this PR into fibers will be added to the body of your pull request. There are additional pull request commands available for use with this pull request.

@openjdk
Copy link

openjdk bot commented Sep 1, 2023

@forax This change now passes all automated pre-integration checks.

ℹ️ This project also has non-automated pre-integration requirements. Please see the file CONTRIBUTING.md for details.

After integration, the commit message for the final commit will be:

WIP: add a new subclass of StructuredTaskScope that shows the finished subtasks as a Stream

You can use pull request commands such as /summary, /contributor and /issue to adjust it as needed.

At the time when this comment was updated there had been no new commits pushed to the fibers branch. If another commit should be pushed before you perform the /integrate command, your PR will be automatically rebased. If you prefer to avoid any potential automatic rebasing, please check the documentation for the /integrate command for further details.

As you do not have Committer status in this project an existing Committer must agree to sponsor your change.

➡️ To flag this PR as ready for integration with the above commit message, type /integrate in a new comment. (Afterwards, your sponsor types /sponsor in a new comment to perform the integration).

@openjdk
Copy link

openjdk bot commented Sep 1, 2023

⚠️ @forax a branch with the same name as the source branch for this pull request (fibers) is present in the target repository. If you eventually integrate this pull request then the branch fibers in your personal fork will diverge once you sync your personal fork with the upstream repository.

To avoid this situation, create a new branch for your changes and reset the fibers branch. You can do this by running the following commands in a local repository for your personal fork. Note: you do not have to name the new branch NEW-BRANCH-NAME.

$ git checkout -b NEW-BRANCH-NAME
$ git branch -f fibers bf5d8122254115c81f026aff8fe882c22612feb5
$ git push -f origin fibers

Then proceed to create a new pull request with NEW-BRANCH-NAME as the source branch and close this one.

@openjdk openjdk bot added ready Ready to be integrated rfr Ready for review labels Sep 1, 2023
@forax forax closed this Sep 1, 2023
@mlbridge
Copy link

mlbridge bot commented Sep 1, 2023

Webrevs

@mlbridge
Copy link

mlbridge bot commented Sep 2, 2023

Mailing list message from Robert Engels on loom-dev:

Why use streams for this all?

Why not add methods like getFirst() and getAll(). Much easier api to work with - especially if you want to expose exceptions.

@mlbridge
Copy link

mlbridge bot commented Sep 2, 2023

Mailing list message from Remi Forax on loom-dev:

----- Original Message -----

From: "Robert Engels" <rengels at ix.netcom.com>
To: "R?mi Forax" <forax at openjdk.org>
Cc: "loom-dev" <loom-dev at openjdk.org>
Sent: Friday, September 1, 2023 3:17:39 PM
Subject: Re: RFR: WIP: add a new subclass of StructuredTaskScope that shows the finished subtasks as a Stream

Why use streams for this all?

Why not add methods like getFirst() and getAll(). Much easier api to work with -
especially if you want to expose exceptions.

getFirst() and getAll() are already here, there are named ShutdownOnFailre and ShutdownOnSucess,
this third sublass is for the uses cases where the semantics is a little more complex than just getFirst() and getAll(), like the first two (using Stream.limit()).

regards,
R?mi

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ready Ready to be integrated rfr Ready for review
Development

Successfully merging this pull request may close these issues.

None yet

1 participant