Go语言任务调度

  • A+
所属分类:技术

chatGPT账号

Go语言任务调度

Goroutine简介

  • 定义:Goroutine是一种轻量级线程,由Go运行时管理。它比操作系统级别的线程更轻量,创建成本低,切换速度快。
  • 创建方式:通过go关键字启动一个新goroutine。
go func() {
    fmt.Println("Hello from goroutine")
}()

Channel的作用

  • 定义:Channel是goroutine之间通信的管道,用于发送和接收数据。
  • 声明与使用
ch := make(chan int) // 声明一个整型channel
ch <- 42             // 向channel发送数据
x := <-ch            // 从channel接收数据
fmt.Println(x)       // 输出: 42

同步与异步

  • 同步:等待一个操作完成后再执行下一步。
  • 异步:不等待操作完成,直接执行下一步。

并发模式

生产者消费者模型:通过channel连接生产者和消费者。

func producer(ch chan<- int) {
    for i := 0; i < 10; i++ {
        ch <- i
    }
    close(ch)
}

func consumer(ch <-chan int) {
    for n := range ch {
        fmt.Println(n)
    }
}

func main() {
    ch := make(chan int)
    go producer(ch)
    consumer(ch)
}

工作池:通过固定数量的goroutine处理任务队列。

type WorkerPool struct {
    jobs  chan int
    done  chan bool
    count int
}

func NewWorkerPool(n int) *WorkerPool {
    return &WorkerPool{
        jobs:  make(chan int, 100),
        done:  make(chan bool, n),
        count: n,
    }
}

func (p *WorkerPool) Start() {
    for i := 0; i < p.count; i++ {
        go func() {
            for job := range p.jobs {
                fmt.Println("Working on", job)
                time.Sleep(time.Second)
            }
            p.done <- true
        }()
    }
}

func (p *WorkerPool) AddJob(jobs ...int) {
    for _, job := range jobs {
        p.jobs <- job
    }
}

func (p *WorkerPool) Wait() {
    for i := 0; i < p.count; i++ {
        <-p.done
    }
    close(p.jobs)
}

func main() {
    wp := NewWorkerPool(5)
    wp.Start()
    wp.AddJob(1, 2, 3, 4, 5)
    wp.Wait()
}

超时控制

使用context包可以方便地设置超时。

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

select {
case <-ctx.Done():
    fmt.Println("Timed out")
case result := <-ch:
    fmt.Println("Result:", result)
}

错误处理

在并发程序中,错误处理尤为重要。

func doWork(ch chan<- error) {
    defer func() {
        if r := recover(); r != nil {
            ch <- errors.New("panic occurred")
        }
    }()

    // 可能引发panic的操作
    fmt.Println(1 / 0)
}

func main() {
    ch := make(chan error)
    go doWork(ch)
    if err := <-ch; err != nil {
        log.Fatal(err)
    }
}

性能考量

  • Goroutine的数量不宜过多,否则会导致上下文切换频繁,影响性能。
  • Channel的缓冲区大小需合理设置,过大会占用内存,过小则可能造成goroutine阻塞。

实战案例

假设我们需要开发一个简单的Web服务器,该服务器能够同时处理多个客户端请求,并且每个请求都可能触发一些耗时较长的操作(如数据库查询)。我们可以利用goroutine和channel来实现这一功能:

package main

import (
	"fmt"
	"log"
	"net/http"
)

func handleRequest(w http.ResponseWriter, r *http.Request) {
    // 模拟耗时操作
    ch := make(chan string)
    go func() {
        time.Sleep(2 * time.Second)
        ch <- "Hello, World!"
    }()

    // 等待结果返回
    result := <-ch
    fmt.Fprintln(w, result)
}

func main() {
    http.HandleFunc("/", handleRequest)
    log.Fatal(http.ListenAndServe(":8080", nil))
}

Select语句

Select语句:类似于多路复用器,可以在多个channel操作中选择一个准备好的操作执行。

ch1 := make(chan int)
ch2 := make(chan int)

go func() {
    time.Sleep(1 * time.Second)
    ch1 <- 1
}()

go func() {
    time.Sleep(2 * time.Second)
    ch2 <- 2
}()

select {
case v := <-ch1:
    fmt.Println("Received from ch1:", v)
case v := <-ch2:
    fmt.Println("Received from ch2:", v)
}

  • Default case:当没有channel准备好时,默认分支会执行。
ch := make(chan int)

select {
case v := <-ch:
    fmt.Println("Received:", v)
default:
    fmt.Println("No data received yet")
}

Context包

Context包:用于传递取消信号、截止时间和其他请求范围的数据。

package main

import (
    "context"
    "fmt"
    "log"
    "time"
)

func doWork(ctx context.Context, ch chan<- string) {
    select {
    case <-ctx.Done():
        ch <- "Operation canceled"
        return
    case <-time.After(2 * time.Second):
        ch <- "Work completed"
    }
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    ch := make(chan string)

    go doWork(ctx, ch)

    time.Sleep(1 * time.Second)
    cancel() // 取消操作

    result := <-ch
    fmt.Println(result)
}

工作窃取(Work Stealing)

工作窃取:一种优化策略,允许空闲的goroutine从其他繁忙的goroutine那里“窃取”任务。

package main

import (
    "fmt"
    "sync"
    "time"
)

var wg sync.WaitGroup

func worker(id int, tasks chan int, results chan<- int) {
    for task := range tasks {
        fmt.Printf("Worker %d processing task %d\n", id, task)
        time.Sleep(1 * time.Second)
        results <- task * task
    }
    wg.Done()
}

func main() {
    numWorkers := 5
    numTasks := 10

    tasks := make(chan int, numTasks)
    results := make(chan int, numTasks)

    wg.Add(numWorkers)
    for i := 0; i < numWorkers; i++ {
        go worker(i, tasks, results)
    }

    for i := 1; i <= numTasks; i++ {
        tasks <- i
    }
    close(tasks)

    wg.Wait()

    for i := 0; i < numTasks; i++ {
        fmt.Println(<-results)
    }
}

闭包与匿名函数

闭包:允许goroutine访问外部变量。

package main

import (
    "fmt"
    "time"
)

func main() {
    values := []int{1, 2, 3, 4, 5}

    for _, value := range values {
        go func(v int) {
            time.Sleep(1 * time.Second)
            fmt.Println(v)
        }(value)
    }

    time.Sleep(2 * time.Second) // 确保所有goroutine完成
}

通道容量与同步

通道容量:带缓冲的channel可以减少goroutine之间的同步开销。

ch := make(chan int, 2) // 容量为2的channel

ch <- 1
ch <- 2

fmt.Println(<-ch)
fmt.Println(<-ch)

同步问题:如果多个goroutine尝试同时访问共享资源,可能会导致竞态条件。

var count int
var mutex sync.Mutex

func increment() {
    mutex.Lock()
    count++
    mutex.Unlock()
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            increment()
        }()
    }

    wg.Wait()
    fmt.Println(count) // 应输出1000
}

并发安全的数据结构

并发安全的容器:Go标准库提供了多种并发安全的数据结构。

package main

import (
    "container/list"
    "fmt"
    "sync"
)

type SafeList struct {
    list *list.List
    lock sync.Mutex
}

func (s *SafeList) PushFront(v interface{}) {
    s.lock.Lock()
    defer s.lock.Unlock()
    s.list.PushFront(v)
}

func (s *SafeList) Front() interface{} {
    s.lock.Lock()
    defer s.lock.Unlock()
    if s.list.Len() == 0 {
        return nil
    }
    return s.list.Front().Value
}

func main() {
    l := &SafeList{list: list.New()}
    var wg sync.WaitGroup

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            l.PushFront(i)
        }(i)
    }

    wg.Wait()
    fmt.Println(l.Front()) // 应输出9
}

免责声明

发文时比特币价格:$71249

当前比特币价格:[crypto coins=”BTC” type=”text” show=”price”]

当前比特币涨幅:[crypto coins=”BTC” type=”text” show=”percent”]

免责声明:

本文不代表路远网立场,且不构成投资建议,请谨慎对待。用户由此造成的损失由用户自行承担,与路远网没有任何关系;

路远网不对网站所发布内容的准确性,真实性等任何方面做任何形式的承诺和保障;

网站内所有涉及到的区块链(衍生)项目,路远网对项目的真实性,准确性等任何方面均不做任何形式的承诺和保障;

网站内所有涉及到的区块链(衍生)项目,路远网不对其构成任何投资建议,用户由此造成的损失由用户自行承担,与路远网没有任何关系;

路远区块链研究院声明:路远区块链研究院内容由路远网发布,部分来源于互联网和行业分析师投稿收录,内容为路远区块链研究院加盟专职分析师独立观点,不代表路远网立场。

  • 我的微信
  • 这是我的微信扫一扫
  • weinxin
  • 我的电报
  • 这是我的电报扫一扫
  • weinxin
chatGPT账号
路远

发表评论

您必须登录才能发表评论!