Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1565: Only poll for updated MRs from GitLab #1369

Closed
wants to merge 17 commits into from
69 changes: 48 additions & 21 deletions bots/csr/src/main/java/org/openjdk/skara/bots/csr/CSRIssueBot.java
Expand Up @@ -26,10 +26,12 @@
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.openjdk.skara.bot.Bot;
import org.openjdk.skara.bot.WorkItem;
import org.openjdk.skara.forge.HostedRepository;
Expand All @@ -49,10 +51,12 @@ public class CSRIssueBot implements Bot {
// Keeps track of updatedAt timestamps from the previous call to getPeriodicItems,
// so we can avoid re-evaluating issues that are returned again without any actual
// update.
private Map<String, ZonedDateTime> issueUpdatedAt = new HashMap<>();
private Map<String, ZonedDateTime> issueUpdatedAt = Map.of();
// The last found updatedAt from any issue.
private ZonedDateTime lastUpdatedAt;

private final List<Issue> retryIssues = new ArrayList<>();

public CSRIssueBot(IssueProject issueProject, List<HostedRepository> repositories) {
this.issueProject = issueProject;
this.repositories = repositories;
Expand All @@ -65,7 +69,6 @@ public String toString() {

@Override
public List<WorkItem> getPeriodicItems() {
var ret = new ArrayList<WorkItem>();
// In the very first round, we just find the last updated issue to
// initialize lastUpdatedAt. There is no need for reacting to any CSR
// issue update before that, as the CSRPullRequestBot will go through
Expand All @@ -75,7 +78,7 @@ public List<WorkItem> getPeriodicItems() {
if (lastUpdatedIssue.isPresent()) {
Issue issue = lastUpdatedIssue.get();
lastUpdatedAt = issue.updatedAt();
issueUpdatedAt.put(issue.id(), issue.updatedAt());
issueUpdatedAt = Map.of(issue.id(), issue.updatedAt());
log.fine("Setting lastUpdatedAt from last updated issue " + issue.id() + " updated at " + lastUpdatedAt);
} else {
// If no previous issue was found, initiate lastUpdatedAt to something far
Expand All @@ -84,28 +87,52 @@ public List<WorkItem> getPeriodicItems() {
lastUpdatedAt = ZonedDateTime.ofInstant(Instant.EPOCH, ZoneId.systemDefault());
log.warning("No CSR issue found, setting lastUpdatedAt to " + lastUpdatedAt);
}
return ret;
return List.of();
}

var newIssuesUpdatedAt = new HashMap<String, ZonedDateTime>();
var issues = issueProject.csrIssues(lastUpdatedAt);
for (var issue : issues) {
newIssuesUpdatedAt.put(issue.id(), issue.updatedAt());
// Update the lastUpdatedAt value with the highest found value for next call
if (issue.updatedAt().isAfter(lastUpdatedAt)) {
lastUpdatedAt = issue.updatedAt();
}
var lastUpdate = issueUpdatedAt.get(issue.id());
if (lastUpdate != null) {
if (!issue.updatedAt().isAfter(lastUpdate)) {
continue;
}
}
var issueWorkItem = new IssueWorkItem(this, issue);
ret.add(issueWorkItem);
if (!issues.isEmpty()) {
lastUpdatedAt = issues.stream()
.map(Issue::updatedAt)
.max(Comparator.naturalOrder())
.orElseThrow(() -> new RuntimeException("No updatedAt field found in any Issue"));
}
var newIssuesUpdatedAt = issues.stream()
.collect(Collectors.toMap(Issue::id, Issue::updatedAt));

var filtered = issues.stream()
.filter(i -> !issueUpdatedAt.containsKey(i.id()) || i.updatedAt().isAfter(issueUpdatedAt.get(i.id())))
.toList();

var withRetries = addRetries(filtered);

var workItems = withRetries.stream()
.map(i -> (WorkItem) new IssueWorkItem(this, i, e -> this.retryIssue(i)))
.toList();

issueUpdatedAt = newIssuesUpdatedAt;
return ret;

return workItems;
}

private synchronized void retryIssue(Issue issue) {
retryIssues.add(issue);
}

private synchronized List<Issue> addRetries(List<Issue> issues) {
if (retryIssues.isEmpty()) {
return issues;
} else {
var retries = retryIssues.stream()
.filter(retryIssue -> issues.stream().noneMatch(i -> retryIssue.id().equals(i.id())))
.toList();
retryIssues.clear();
if (retries.isEmpty()) {
return issues;
} else {
return Stream.concat(issues.stream(), retries.stream()).toList();
}
}
}

@Override
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.openjdk.skara.bot.*;
import org.openjdk.skara.forge.HostedRepository;
import org.openjdk.skara.forge.PullRequest;
import org.openjdk.skara.forge.PullRequestPoller;
import org.openjdk.skara.issuetracker.IssueProject;

import java.util.*;
Expand All @@ -40,19 +41,12 @@ class CSRPullRequestBot implements Bot {
private final Logger log = Logger.getLogger("org.openjdk.skara.bots.csr");
private final HostedRepository repo;
private final IssueProject project;
// Keeps track of updatedAt timestamps from the previous call to getPeriodicItems,
// so we can avoid re-evaluating PRs that are returned again without any actual
// update. This is needed because timestamp based searches aren't exact enough
// to avoid sometimes receiving the same items multiple times.
private Map<String, ZonedDateTime> prsUpdatedAt = new HashMap<>();
// The last found updateAt in any returned PR. Used for limiting results on the
// next call to the hosted repo. Should only contain timestamps originating
// from the remote repo to avoid problems with mismatched clocks.
private ZonedDateTime lastUpdatedAt;
private final PullRequestPoller poller;

CSRPullRequestBot(HostedRepository repo, IssueProject project) {
this.repo = repo;
this.project = project;
this.poller = new PullRequestPoller(repo, false, false, false);
}

@Override
Expand All @@ -62,28 +56,12 @@ public String toString() {

@Override
public List<WorkItem> getPeriodicItems() {
var items = new ArrayList<WorkItem>();
log.info("Fetching all open pull requests for " + repo.name());
Map<String, ZonedDateTime> newPrsUpdatedAt = new HashMap<>();
// On the first run we have to re-evaluate all open PRs, after that, only
// looking at PRs that have been updated should be enough.
var prs = lastUpdatedAt != null ? repo.openPullRequestsAfter(lastUpdatedAt) : repo.pullRequests();
for (PullRequest pr : prs) {
newPrsUpdatedAt.put(pr.id(), pr.updatedAt());
// Update lastUpdatedAt with the last found updatedAt for the next call
if (lastUpdatedAt == null || pr.updatedAt().isAfter(lastUpdatedAt)) {
lastUpdatedAt = pr.updatedAt();
}
var lastUpdate = prsUpdatedAt.get(pr.id());
if (lastUpdate != null) {
if (!pr.updatedAt().isAfter(lastUpdate)) {
continue;
}
}
var pullRequestWorkItem = new PullRequestWorkItem(repo, pr.id(), project, pr.updatedAt());
items.add(pullRequestWorkItem);
}
prsUpdatedAt = newPrsUpdatedAt;
var prs = poller.updatedPullRequests();
var items = prs.stream()
.map(pr -> (WorkItem) new PullRequestWorkItem(repo, pr.id(), project,
e -> poller.retryPullRequest(pr), pr.updatedAt()))
.toList();
poller.lastBatchHandled();
return items;
}

Expand Down
Expand Up @@ -27,6 +27,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.logging.Logger;
import java.util.stream.Stream;
import org.openjdk.skara.bot.WorkItem;
Expand All @@ -46,9 +47,12 @@ class IssueWorkItem implements WorkItem {
private final CSRIssueBot bot;
private final Issue csrIssue;

public IssueWorkItem(CSRIssueBot bot, Issue csrIssue) {
private final Consumer<RuntimeException> errorHandler;

public IssueWorkItem(CSRIssueBot bot, Issue csrIssue, Consumer<RuntimeException> errorHandler) {
this.bot = bot;
this.csrIssue = csrIssue;
this.errorHandler = errorHandler;
}

@Override
Expand Down Expand Up @@ -83,7 +87,8 @@ public Collection<WorkItem> run(Path scratchPath) {
.filter(Issue::isOpen)
// This will mix time stamps from the IssueTracker and the Forge hosting PRs, but it's the
// best we can do.
.map(pr -> new PullRequestWorkItem(pr.repository(), pr.id(), csrIssue.project(), csrIssue.updatedAt()))
.map(pr -> new PullRequestWorkItem(pr.repository(), pr.id(), csrIssue.project(),
errorHandler, csrIssue.updatedAt()))
.forEach(ret::add);
return ret;
}
Expand All @@ -97,4 +102,9 @@ public String botName() {
public String workItemName() {
return "issue";
}

@Override
public void handleRuntimeException(RuntimeException e) {
errorHandler.accept(e);
}
}
Expand Up @@ -28,6 +28,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.openjdk.skara.bot.WorkItem;
Expand All @@ -53,6 +54,7 @@ class PullRequestWorkItem implements WorkItem {
private final HostedRepository repository;
private final String prId;
private final IssueProject project;
final Consumer<RuntimeException> errorHandler;
/**
* The updatedAt timestamp of the external entity that triggered this WorkItem,
* which would be either a PR or a CSR Issue. Used for tracking reaction legacy
Expand All @@ -61,10 +63,11 @@ class PullRequestWorkItem implements WorkItem {
private final ZonedDateTime triggerUpdatedAt;

public PullRequestWorkItem(HostedRepository repository, String prId, IssueProject project,
ZonedDateTime triggerUpdatedAt) {
Consumer<RuntimeException> errorHandler, ZonedDateTime triggerUpdatedAt) {
this.repository = repository;
this.prId = prId;
this.project = project;
this.errorHandler = errorHandler;
this.triggerUpdatedAt = triggerUpdatedAt;
}

Expand Down Expand Up @@ -253,4 +256,9 @@ public String botName() {
public String workItemName() {
return "pr";
}

@Override
public void handleRuntimeException(RuntimeException e) {
errorHandler.accept(e);
}
}
12 changes: 12 additions & 0 deletions forge/src/main/java/org/openjdk/skara/forge/Forge.java
Expand Up @@ -22,6 +22,7 @@
*/
package org.openjdk.skara.forge;

import java.time.Duration;
import org.openjdk.skara.host.*;
import org.openjdk.skara.json.JSONObject;
import org.openjdk.skara.vcs.Hash;
Expand All @@ -43,6 +44,17 @@ public interface Forge extends Host {
Optional<HostedRepository> repository(String name);
Optional<HostedCommit> search(Hash hash);

/**
* Some forges do not always update the "updated_at" fields of various objects.
* This method returns a Duration indicating how long the shortest update
* interval is for this field. This is needed to be taken into account when
* running queries based on this field. The default returns 0 which means no
* special considerations are needed.
*/
default Duration minTimeStampUpdateInterval() {
return Duration.ofMillis(0);
}

static Forge from(String name, URI uri, Credential credential, JSONObject configuration) {
var factory = ForgeFactory.getForgeFactories().stream()
.filter(f -> f.name().equals(name))
Expand Down